|
| 1 | +# Score 整体流程 # |
| 2 | + |
| 3 | +在 `sched.Algorithm.Schedule()` 中执行完 PreScore 扩展点之后,会进入具体的节点打分流程。 |
| 4 | + |
| 5 | +``` go |
| 6 | + ... |
| 7 | + |
| 8 | + startPriorityEvalTime := time.Now() |
| 9 | + // When only one node after predicate, just use it. |
| 10 | + if len(filteredNodes) == 1 { |
| 11 | + metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) |
| 12 | + return ScheduleResult{ |
| 13 | + SuggestedHost: filteredNodes[0].Name, |
| 14 | + EvaluatedNodes: 1 + len(filteredNodesStatuses), |
| 15 | + FeasibleNodes: 1, |
| 16 | + }, nil |
| 17 | + } |
| 18 | + |
| 19 | + priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes) |
| 20 | + if err != nil { |
| 21 | + return result, err |
| 22 | + } |
| 23 | + ... |
| 24 | + |
| 25 | + host, err := g.selectHost(priorityList) |
| 26 | + trace.Step("Prioritizing done") |
| 27 | + |
| 28 | + return ScheduleResult{ |
| 29 | + SuggestedHost: host, |
| 30 | + EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses), |
| 31 | + FeasibleNodes: len(filteredNodes), |
| 32 | + }, err |
| 33 | +} |
| 34 | +``` |
| 35 | + |
| 36 | +首先判断候选节点的数量,如果只有一个,就没有必要进行 Score 扩展点,会直接返回。 |
| 37 | + |
| 38 | +否则会通过调用 `prioritizeNodes()` 对节点进行打分,打分结束后使用 `selectHost()` 来选一个最优的节点,最后返回所选的节点和一些其它相关信息。先来看下 `selectHost()` 函数: |
| 39 | + |
| 40 | +``` go |
| 41 | +func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) { |
| 42 | + if len(nodeScoreList) == 0 { |
| 43 | + return "", fmt.Errorf("empty priorityList") |
| 44 | + } |
| 45 | + maxScore := nodeScoreList[0].Score |
| 46 | + selected := nodeScoreList[0].Name |
| 47 | + cntOfMaxScore := 1 |
| 48 | + for _, ns := range nodeScoreList[1:] { |
| 49 | + if ns.Score > maxScore { |
| 50 | + maxScore = ns.Score |
| 51 | + selected = ns.Name |
| 52 | + cntOfMaxScore = 1 |
| 53 | + } else if ns.Score == maxScore { |
| 54 | + cntOfMaxScore++ |
| 55 | + if rand.Intn(cntOfMaxScore) == 0 { |
| 56 | + // Replace the candidate with probability of 1/cntOfMaxScore |
| 57 | + selected = ns.Name |
| 58 | + } |
| 59 | + } |
| 60 | + } |
| 61 | + return selected, nil |
| 62 | +} |
| 63 | +``` |
| 64 | + |
| 65 | +它会选择得分最高的节点;如果得分最高的节点有多个,则随机选择一个。 |
| 66 | + |
| 67 | +## prioritizeNodes() ## |
| 68 | + |
| 69 | +现在回头看 `prioritizeNodes()`,这个函数的主要作用是同时调用新旧两种扩展方式实现的打分功能。新的方式就是调度框架(Framework),旧的方式是通过调度器扩展来实现,我们主要关注前者,会在下一节详细分析其中的 Score 和 NormalizeScore 扩展点的执行,而旧的方式相关调用可以自行参阅相关代码。 |
| 70 | + |
| 71 | +``` go |
| 72 | +func (g *genericScheduler) prioritizeNodes( |
| 73 | + ctx context.Context, |
| 74 | + prof *profile.Profile, |
| 75 | + state *framework.CycleState, |
| 76 | + pod *v1.Pod, |
| 77 | + nodes []*v1.Node, |
| 78 | +) (framework.NodeScoreList, error) { |
| 79 | + ... |
| 80 | + if len(g.extenders) == 0 && !prof.HasScorePlugins() { |
| 81 | + result := make(framework.NodeScoreList, 0, len(nodes)) |
| 82 | + for i := range nodes { |
| 83 | + result = append(result, framework.NodeScore{ |
| 84 | + Name: nodes[i].Name, |
| 85 | + Score: 1, |
| 86 | + }) |
| 87 | + } |
| 88 | + return result, nil |
| 89 | + } |
| 90 | +``` |
| 91 | +
|
| 92 | +如果没有调度器扩展且调度框架中没有 Score 扩展点的插件,则认为所有节点的得分都是 1,然后将结果返回。 |
| 93 | +
|
| 94 | +``` go |
| 95 | + // Run the Score plugins. |
| 96 | + scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) |
| 97 | + if !scoreStatus.IsSuccess() { |
| 98 | + return framework.NodeScoreList{}, scoreStatus.AsError() |
| 99 | + } |
| 100 | + |
| 101 | + // Summarize all scores. |
| 102 | + result := make(framework.NodeScoreList, 0, len(nodes)) |
| 103 | + |
| 104 | + for i := range nodes { |
| 105 | + result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) |
| 106 | + for j := range scoresMap { |
| 107 | + result[i].Score += scoresMap[j][i].Score |
| 108 | + } |
| 109 | + } |
| 110 | + |
| 111 | + if len(g.extenders) != 0 && nodes != nil { |
| 112 | + ... |
| 113 | + } |
| 114 | + |
| 115 | + ... |
| 116 | + return result, nil |
| 117 | +} |
| 118 | +``` |
| 119 | + |
| 120 | +接着调用 `RunScorePlugins()` 执行具体的打分流程。将返回的结果进行格式转换,然后存入 `result` 变量中。然后判断是否有调度扩展插件,如果有的话,则执行这些插件对应的 `Prioritize()` 函数(这个不在这里进行分析)。最后将 `result` 结果返回。 |
| 121 | + |
| 122 | +下面分析数据的格式转换过程。 |
| 123 | + |
| 124 | +``` go |
| 125 | + RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) |
| 126 | +``` |
| 127 | + |
| 128 | +``` go |
| 129 | +// PluginToNodeScores declares a map from plugin name to its NodeScoreList. |
| 130 | +type PluginToNodeScores map[string]NodeScoreList |
| 131 | + |
| 132 | +// NodeScoreList declares a list of nodes and their scores. |
| 133 | +type NodeScoreList []NodeScore |
| 134 | + |
| 135 | +// NODESCORE is a struct with node name and score. |
| 136 | +type NodeScore struct { |
| 137 | + Name string |
| 138 | + Score int64 |
| 139 | +} |
| 140 | +``` |
| 141 | + |
| 142 | +`RunScorePlugins()` 返回的值是一个 Map 对象。key 为插件名称,value 为由 `NodeScore` 组成的数组,而 `NodeScore` 则包含了节点名称和当前节点的得分。也就是说,value 包含了当前插件在每个节点上执行的得分和节点名称。 |
| 143 | + |
| 144 | +``` go |
| 145 | + result := make(framework.NodeScoreList, 0, len(nodes)) |
| 146 | +``` |
| 147 | + |
| 148 | +而 `result` 则是一个由 `NodeScore` 组成的数组,每个元素包含了节点的名称和当前节点的总得分。 |
| 149 | + |
| 150 | +``` go |
| 151 | + for i := range nodes { |
| 152 | + result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) |
| 153 | + for j := range scoresMap { |
| 154 | + result[i].Score += scoresMap[j][i].Score |
| 155 | + } |
| 156 | + } |
| 157 | +``` |
| 158 | + |
| 159 | +在 `result` 里没有了插件名称的信息。这个转换过程相当于进行了汇总,即针对某一个节点,将这个节点上所有插件的得分相加,结果作为当前节点的最终得分。 |
| 160 | + |
| 161 | +下一节对 `RunScorePlugins()` 进行分析,它会计算每个插件在每个节点上的得分计算过程,主要包括 Score 和 NormalizeScore 扩展点的执行。 |
0 commit comments