|
| 1 | +# Reserve 扩展点的执行 # |
| 2 | + |
| 3 | +Reserve 扩展点的作用是在 Pod 与节点绑定之前,做一些相关附属资源的预留,例如 PVC。然后可以在 PreBind 扩展点对这些已经预留的资源进行预绑定,再在 Bind 扩展点对 Pod 和节点之间完成绑定。 |
| 4 | + |
| 5 | +## 数据结构 ## |
| 6 | + |
| 7 | +``` go |
| 8 | +type ReservePlugin interface { |
| 9 | + Plugin |
| 10 | + Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status |
| 11 | + Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) |
| 12 | +} |
| 13 | +``` |
| 14 | + |
| 15 | +实现了 Reserve 扩展点的插件必须同时实现 `Reserve()` 和 `Unreserve()` 函数。前者就是预留资源的作用;而后者相当于在执行失败的时候执行相应的回退操作。例如将预留的 PVC 等资源从预留缓存中删除,即不再预留这些资源。 |
| 16 | + |
| 17 | +在 Reserver 以及后续所有操作执行失败时都需要执行 `Unreserve()`。 |
| 18 | + |
| 19 | +## Assume ## |
| 20 | + |
| 21 | +在执行 Reserve 扩展点之前,有一个 Assume 的操作。 |
| 22 | + |
| 23 | +``` go |
| 24 | + assumedPodInfo := podInfo.DeepCopy() |
| 25 | + assumedPod := assumedPodInfo.Pod |
| 26 | + |
| 27 | + err = sched.assume(assumedPod, scheduleResult.SuggestedHost) |
| 28 | + if err != nil { |
| 29 | + ... |
| 30 | + sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "") |
| 31 | + return |
| 32 | + } |
| 33 | +``` |
| 34 | + |
| 35 | +首先将 Pod 进行深度复制,在后续的所有扩展点执行过程中,修改的都是复制之后的 Pod 对象,这样在发生错误的时候就可以保证原始 Pod 信息仍然维持原样。 |
| 36 | + |
| 37 | +``` go |
| 38 | +func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { |
| 39 | + assumed.Spec.NodeName = host |
| 40 | + |
| 41 | + if err := sched.SchedulerCache.AssumePod(assumed); err != nil { |
| 42 | + klog.Errorf("scheduler cache AssumePod failed: %v", err) |
| 43 | + return err |
| 44 | + } |
| 45 | + // if "assumed" is a nominated pod, we should remove it from internal cache |
| 46 | + if sched.SchedulingQueue != nil { |
| 47 | + sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) |
| 48 | + } |
| 49 | + |
| 50 | + return nil |
| 51 | +} |
| 52 | +``` |
| 53 | + |
| 54 | +然后执行 `AssumePod()`,作用是将深度复制之后的 Pod加入到缓存中,以便后续的绑定等操作可以异步执行。 |
| 55 | + |
| 56 | + |
| 57 | +## Reserve 扩展点 ## |
| 58 | + |
| 59 | +执行完 Assume 操作后,会接着执行 Reserve 扩展点。 |
| 60 | + |
| 61 | +``` go |
| 62 | + // Run the Reserve method of reserve plugins. |
| 63 | + if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { |
| 64 | + ... |
| 65 | + fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) |
| 66 | + if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { |
| 67 | + klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) |
| 68 | + } |
| 69 | + sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "") |
| 70 | + return |
| 71 | + } |
| 72 | +``` |
| 73 | + |
| 74 | +调用 `RunReservePluginsReserve()` 来预留资源,如果执行失败,则需要执行 `Unreserve()` 将预先消费的资源释放掉,并将 Pod 从调度队列中删除,然后返回错误。 |
| 75 | + |
| 76 | +``` go |
| 77 | +func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { |
| 78 | + ... |
| 79 | + for _, pl := range f.reservePlugins { |
| 80 | + status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) |
| 81 | + if !status.IsSuccess() { |
| 82 | + err := status.AsError() |
| 83 | + klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod)) |
| 84 | + return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err)) |
| 85 | + } |
| 86 | + } |
| 87 | + return nil |
| 88 | +} |
| 89 | + |
| 90 | +func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { |
| 91 | + if !state.ShouldRecordPluginMetrics() { |
| 92 | + return pl.Reserve(ctx, state, pod, nodeName) |
| 93 | + } |
| 94 | + startTime := time.Now() |
| 95 | + status := pl.Reserve(ctx, state, pod, nodeName) |
| 96 | + f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime)) |
| 97 | + return status |
| 98 | +} |
| 99 | +``` |
| 100 | + |
| 101 | +会遍历所有实现了 Reserve 扩展点的插件,调用其中的 `Reserve()` 函数。 |
| 102 | + |
| 103 | +目前,只有一个内置插件实现了 Reserve 扩展点,即 VolumeBinding 插件。 |
| 104 | + |
| 105 | +``` go |
| 106 | +func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { |
| 107 | + state, err := getStateData(cs) |
| 108 | + if err != nil { |
| 109 | + return framework.AsStatus(err) |
| 110 | + } |
| 111 | + // we don't need to hold the lock as only one node will be reserved for the given pod |
| 112 | + podVolumes, ok := state.podVolumesByNode[nodeName] |
| 113 | + if ok { |
| 114 | + allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes) |
| 115 | + if err != nil { |
| 116 | + return framework.AsStatus(err) |
| 117 | + } |
| 118 | + state.allBound = allBound |
| 119 | + } else { |
| 120 | + // may not exist if the pod does not reference any PVC |
| 121 | + state.allBound = true |
| 122 | + } |
| 123 | + return nil |
| 124 | +} |
| 125 | +``` |
| 126 | + |
| 127 | +通过调用 `pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes)` 将当前 Pod 所使用的的所有 PVC 和对应的 PV 在缓存中进行绑定。注意这里的绑定只是发生在缓存中的,并未提交到 Kubernetes API Server 进行持久化。 |
| 128 | + |
| 129 | +``` go |
| 130 | +func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { |
| 131 | + s, err := getStateData(cs) |
| 132 | + if err != nil { |
| 133 | + return |
| 134 | + } |
| 135 | + // we don't need to hold the lock as only one node may be unreserved |
| 136 | + podVolumes, ok := s.podVolumesByNode[nodeName] |
| 137 | + if !ok { |
| 138 | + return |
| 139 | + } |
| 140 | + pl.Binder.RevertAssumedPodVolumes(podVolumes) |
| 141 | + return |
| 142 | +} |
| 143 | + |
| 144 | +func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) { |
| 145 | + b.revertAssumedPVs(podVolumes.StaticBindings) |
| 146 | + b.revertAssumedPVCs(podVolumes.DynamicProvisions) |
| 147 | +} |
| 148 | + |
| 149 | +func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) { |
| 150 | + for _, BindingInfo := range bindings { |
| 151 | + b.pvCache.Restore(BindingInfo.pv.Name) |
| 152 | + } |
| 153 | +} |
| 154 | + |
| 155 | +func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) { |
| 156 | + for _, claim := range claims { |
| 157 | + b.pvcCache.Restore(getPVCName(claim)) |
| 158 | + } |
| 159 | +} |
| 160 | +``` |
| 161 | + |
| 162 | +这是 VolumeBinding 插件的 `Unreserve()` 函数的实现。如果后续的 Pod 调度过程失败,则只需要将缓存中的数据重置即可。 |
0 commit comments