失败处理与抢占调度Preemption

1. 抢占事件发生的时机

在代码中搜索关键字Preempt会找到路径pkg/scheduler/framework/preemption下的Preempt()方法,向上寻找调用关系可以到路径pkg/scheduler/framework/plugins/defaultpreemption,被其中default_preemption.go中的PostFilter()方法直接调用,可以说明抢占流程作为调度插件之一存在于PostFilter扩展点。RunPostFilterPlugins()方法的调用发生在SchedulePod()方法返回的错误信息不为空时,也就是PredicatesPriorities两个阶段存在失败,没有能找到一个符合条件的节点运行Pod,方法调用如下方代码内容所示。

func (sched *Scheduler) schedulingCycle(
    ctx context.Context,
    state *framework.CycleState,
    fwk framework.Framework,
    podInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
    logger := klog.FromContext(ctx)
    pod := podInfo.Pod
    scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
    if err != nil{
        ......
        fitError, ok := err.(*framework.FitError)
        ......
        result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatus)
        ......
    }
    ......
}

RunPostFilterPlugins()方法与PostFilter()方法之间的逻辑与其他种类插件一致,下面从分析DefaultPreemption实现的PostFilter()方法开始分析抢占的流程。

func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
    defer func() {
        metrics.PreemptionAttempts.Inc()
    }()

    result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
    msg := status.Message()
    if len(msg) > 0 {
        return result, framework.NewStatus(status.Code(), "preemption: "+msg)
    }
    return result, status
}

PostFilter()中调用了Preempt()方法,它是实际的抢占逻辑入口,在简单了解抢占的相关组件后,再正式开始深入分析抢占逻辑的实现。

2. 关键组件

2.1. 评估器

kube-scheduler中,Evaluator评估器是负责抢占流程的核心组件。在抢占中Evaluator负责流程控制和协调,具体的策略逻辑由实现了Interface接口的插件控制,PreemptPod()方法是抢占过程中的核心动作之一。

type Evaluator struct {
    // 插件名称
    PluginName string
    // 调度框架句柄
    Handler    framework.Handle
    // Pod Lister接口
    PodLister  corelisters.PodLister
    // PDB LIster接口
    PdbLister  policylisters.PodDisruptionBudgetLister
    // 异步抢占开关
    enableAsyncPreemption bool
    // 读写锁
    mu                    sync.RWMutex
    // 抢占流程中的Pod集合
    preempting sets.Set[types.UID]
    // 抢占逻辑函数
    PreemptPod func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error
    // 插件接口
    Interface
}

Interface接口实现的方法如下,其中的方法实现在抢占流程中分析,此处不做说明。

type Interface interface {
    // 获取候选节点偏移量
    GetOffsetAndNumCandidates(nodes int32) (int32, int32)
    // 候选节点到受害Pod的映射
    CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims
    // 判断Pod是否有抢占资格
    PodEligibleToPreemptOthers(ctx context.Context, pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string)
    // 在候选节点上选择受害Pod
    SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
        pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
    // 节点评分排序函数
    OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64
}

创建评估器

评估器实例的创建最早可以追溯到NewInTreeRegistry()函数,它实现了内置插件的创建和注册。插件结构包括调度框架句柄fh,特性开关集合fts,插件参数argsPodPDBLister以及评估器实例Evaluator

func NewInTreeRegistry() runtime.Registry {
    ......
    registry := runtime.Registry{
        // 注册默认抢占插件
        defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
    }

    return registry
}

func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
    args, ok := dpArgs.(*config.DefaultPreemptionArgs)
    if !ok {
        return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs)
    }
    if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
        return nil, err
    }

    podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
    pdbLister := getPDBLister(fh.SharedInformerFactory())
    // 创建插件实例
    pl := DefaultPreemption{
        fh:        fh,
        fts:       fts,
        args:      *args,
        podLister: podLister,
        pdbLister: pdbLister,
    }
    pl.Evaluator = preemption.NewEvaluator(Name, fh, &pl, fts.EnableAsyncPreemption)

    return &pl, nil
}

// 创建评估器
func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsyncPreemption bool) *Evaluator {
    podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
    pdbLister := fh.SharedInformerFactory().Policy().V1().PodDisruptionBudgets().Lister()

    ev := &Evaluator{
        PluginName:            names.DefaultPreemption,
        Handler:               fh,
        PodLister:             podLister,
        PdbLister:             pdbLister,
        Interface:             i,
        enableAsyncPreemption: enableAsyncPreemption,
        preempting:            sets.New[types.UID](),
    }
    // 注册PreemptPod()方法
    ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
        ......
    }

    return ev
}

2.2. 提名器

在前面的学习中了解过调度队列的组件Nominator提名器,虽然它是SchedulingQueue的组件,但是和抢占流程也有一些关系。因为抢占插件返回的结构framework.PostFilterResult实际是一个NominatingInfo的指针,该结构的使用一般出现在提名器中。并且在抢占流程的最后,会修改Pod对象的Status.NominatedNodeName字段,在后面的调度周期中使用,如在Predicates阶段findNodesThatFitPod()方法的逻辑中,如果Pod.Status.NominatedNodeName不为空,会优先单独评估NominatedNodeName是否满足条件,如果不满足才会走后续标准流程。

type PostFilterResult struct {
    *NominatingInfo
}

type NominatingInfo struct {
    NominatedNodeName string
    NominatingMode    NominatingMode
}

3. 抢占流程详解

前面已经做好了铺垫,在PostFilter()方法中执行Preempt()方法直接返回抢占的结果,所以从pkg/scheduler/framework/preemption/preemption.go路径的Preempt()方法开始详细的抢占流程分析。

根据源码注释可以看出,抢占大致分为六步:

  • 第一步:通过PodLister获取Pod的最新信息状态;

  • 第二步:确认这个Pod是否有资格进行抢占;

  • 第三步:获取所有可发生抢占的候选节点;

  • 第四步:调用注册的扩展器进一步缩小候选节点范围;

  • 第五步:根据各种条件选择出最优的候选节点;

  • 第六步:执行抢占,驱逐低优先级Pod;

这六个步骤执行后会返回抢占的最终结果给调度器,整体来看和标准的调度流程很类似,都会包括预选和优选的过程,实际上这些逻辑都紧紧围绕着调度器的本职工作:为Pod选择合适的Node,然后把Node的名称告诉Pod。

func (ev *Evaluator) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
    logger := klog.FromContext(ctx)

    // 0) Fetch the latest version of <pod>.
    // It's safe to directly fetch pod here. Because the informer cache has already been
    // initialized when creating the Scheduler obj.
    // However, tests may need to manually initialize the shared pod informer.
    podNamespace, podName := pod.Namespace, pod.Name
    pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
    if err != nil {
        logger.Error(err, "Could not get the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
        return nil, framework.AsStatus(err)
    }

    // 1) Ensure the preemptor is eligible to preempt other pods.
    nominatedNodeStatus := m.Get(pod.Status.NominatedNodeName)
    if ok, msg := ev.PodEligibleToPreemptOthers(ctx, pod, nominatedNodeStatus); !ok {
        logger.V(5).Info("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
        return nil, framework.NewStatus(framework.Unschedulable, msg)
    }

    // 2) Find all preemption candidates.
    allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()
    if err != nil {
        return nil, framework.AsStatus(err)
    }
    candidates, nodeToStatusMap, err := ev.findCandidates(ctx, state, allNodes, pod, m)
    if err != nil && len(candidates) == 0 {
        return nil, framework.AsStatus(err)
    }

    // Return a FitError only when there are no candidates that fit the pod.
    if len(candidates) == 0 {
        fitError := &framework.FitError{
            Pod:         pod,
            NumAllNodes: len(allNodes),
            Diagnosis: framework.Diagnosis{
                NodeToStatus: nodeToStatusMap,
                // Leave UnschedulablePlugins or PendingPlugins as nil as it won't be used on moving Pods.
            },
        }
        fitError.Diagnosis.NodeToStatus.SetAbsentNodesStatus(framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling"))
        // Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
        return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
    }

    // 3) Interact with registered Extenders to filter out some candidates if needed.
    candidates, status := ev.callExtenders(logger, pod, candidates)
    if !status.IsSuccess() {
        return nil, status
    }

    // 4) Find the best candidate.
    bestCandidate := ev.SelectCandidate(ctx, candidates)
    if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
        return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
    }

    logger.V(2).Info("the target node for the preemption is determined", "node", bestCandidate.Name(), "pod", klog.KObj(pod))

    // 5) Perform preparation work before nominating the selected candidate.
    if ev.enableAsyncPreemption {
        ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName)
    } else {
        if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
            return nil, status
        }
    }

    return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
}

下面对每一个阶段分别进行分析,Pod信息更新作为常规操作不多说明。

3.1. 抢占资格判断

通过NodeToStatusReaderGet()方法获取提名节点的状态信息记录,传递给PodEligibleToPreemptOthers()方法判断Pod是否有资格进行抢占操作。首先判断Pod.Spec.PreemptionPolicy中的抢占策略是否存在且不为Never,然后获取节点快照和Pod的NominatedNodeName,如果字段存在表示已经在之前进行过抢占流程了,现在又出现在PostFilter阶段表示抢占失败。此时需要判断这个Pod是否重新进行抢占操作,如果是UnschedulableAndUnresolvable表示提名节点因为某些问题出现了永久不可用的情况,允许开始重新抢占;如果是其他失败原因如Unschedulable则需要没有低于当前优先级的Pod正因抢占而退出才可以重新抢占,在这种因为临时资源导致抢占失败的情况下,为了避免资源浪费Pod还可以重试抢占尝试调度到其他节点。一般来说,抢占解决的是Unschedulable的问题,而UnschedulableAndUnresolvable的重试是上一次调度到当前调度周期之间发生了预期以外的变化,所以允许重新抢占。

回顾一下Filter扩展点的错误状态,Unschedulable表示临时的调度失败,如CPU资源不足、Pod间亲和性不满足等情况,可能下一轮调度情况变化就会可以调度了,这种情况不用人工干预只需要调度器重试;UnschedulableAndUnresolvable表示硬性条件的不满足,比如NodeSelector节点标签不满足、PV绑定失败等情况,其中临时条件和永久条件的关键区别在于资源是否随着Pod的生命周期而发生变化

// 1) Ensure the preemptor is eligible to preempt other pods.
nominatedNodeStatus := m.Get(pod.Status.NominatedNodeName)
if ok, msg := ev.PodEligibleToPreemptOthers(ctx, pod, nominatedNodeStatus); !ok {
    logger.V(5).Info("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
    return nil, framework.NewStatus(framework.Unschedulable, msg)
}

func (pl *DefaultPreemption) PodEligibleToPreemptOthers(_ context.Context, pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
    // 确认抢占策略开启
    if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
        return false, "not eligible due to preemptionPolicy=Never."
    }
    // 从快照获取节点信息
    nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()
    nomNodeName := pod.Status.NominatedNodeName
    // 如果Pod已经经过一轮抢占计算并且有NominatedNodeName
    // 但是还在本轮调度的Filter阶段失败而走到了抢占流程
    if len(nomNodeName) > 0 {
        // 抢占失败 允许重试
        if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
            return true, ""
        }
        // 遍历提名节点的Pod
        if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
            podPriority := corev1helpers.PodPriority(pod)
            for _, p := range nodeInfo.Pods {
                if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod) {
                    // 如果有低于当前优先级的Pod且正因为抢占处于退出状态则不允许进行抢占
                    return false, "not eligible due to a terminating pod on the nominated node."
                }
            }
        }
    }
    return true, ""
}

3.2. 获取候选节点

确认Pod可以进行抢占后,通过NodeLister获取全量节点信息,findCandidates返回所有候选节点和状态。然后通过NodeToStatusReader接口获取信息记录为Unschedulable的节点,因为UnschedulableAndUnresolvable是硬性条件的不满足,这种条件不会以Pod的生命周期变化转换为满足,所以不在抢占的考虑范围以内。如果此时的潜在节点potentialNodes长度为0,已经没有合适的节点可以发生抢占,会清除当前Pod的NominatedNodeName信息并结束。potentialNodes长度大于0时,先获取集群中的所有PDB信息,驱逐某个Pod可能还会影响到其他命名空间的PDB,所以此处获取全量对象。

func (ev *Evaluator) findCandidates(ctx context.Context, state *framework.CycleState, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) {
    // 全量节点数量判断
    if len(allNodes) == 0 {
        return nil, nil, errors.New("no nodes available")
    }
    logger := klog.FromContext(ctx)
    // 通过状态Unschedulable初步缩小潜在候选节点的范围
    potentialNodes, err := m.NodesForStatusCode(ev.Handler.SnapshotSharedLister().NodeInfos(), framework.Unschedulable)
    if err != nil {
        return nil, nil, err
    }
    // 潜在节点数量判断
    if len(potentialNodes) == 0 {
        logger.V(3).Info("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
        // In this case, we should clean-up any existing nominated node name of the pod.
        if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil {
            logger.Error(err, "Could not clear the nominatedNodeName field of pod", "pod", klog.KObj(pod))
            // We do not return as this error is not critical.
        }
        return nil, framework.NewDefaultNodeToStatus(), nil
    }
    // 获取PDB信息
    pdbs, err := getPodDisruptionBudgets(ev.PdbLister)
    if err != nil {
        return nil, nil, err
    }
    // 确定当前批次候选节点范围
    offset, candidatesNum := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
    // 执行预抢占模拟
    return ev.DryRunPreemption(ctx, state, pod, potentialNodes, pdbs, offset, candidatesNum)
}

Pod干扰预算

PDB(PodDisruptionBudget,Pod干扰预算)用于控制Pod副本的最小可用/最大不可用数量,保护应用避免发生服务中断。涉及到如抢占驱逐节点排空滚动更新水平扩缩容等场景,该特性在1.21版本进入稳定状态,详情可见官方文档

节点批量选择

GetOffsetAndNumCandidates()方法接收一个INT32的整数,返回两个值,分别代表选择节点的起始偏移量候选节点数量

func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
    return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
}

候选节点数量的选择规则如下,此处涉及到两个参数值MinCandidateNodesPercentageMinCandidateNodesAbsolute,分别表示最小百分比最小绝对值,这两个变量的默认值可以在pkg/scheduler/apis/config/testing/defaults/defaults.go中找到。

var PluginConfigsV1 = []config.PluginConfig{
    {
        Name: "DefaultPreemption",
        Args: &config.DefaultPreemptionArgs{
            MinCandidateNodesPercentage: 10,
            MinCandidateNodesAbsolute:   100,
        },
    },
}

采样数量=节点总数*最小百分比,且不小于最小绝对值,不大于节点总数。

func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
    // 采样数量=节点总数*最小百分比
    n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
    // 如果采样数量<最小绝对值
    if n < pl.args.MinCandidateNodesAbsolute {
        // 采样数量=最小绝对值
        n = pl.args.MinCandidateNodesAbsolute
    }
    // 如果采样数量>节点总数
    if n > numNodes {
        // 采样数量=节点总数
        n = numNodes
    }
    return n
}

模拟抢占

DryRunPreemption()方法进行模拟抢占,根据函数签名,接收上下文ctx、调度状态state、抢占主体pod、潜在节点列表potentialNodes、Pod干扰预算pdbs、索引偏移量offset、采样数量candidatesNum,返回[]Candidate类型的候选节点列表和NodeToStatus类型的节点状态映射。具体的逻辑实现是先初始化两个列表分别记录违反PDB不违反PDB的候选节点,然后使用并行器在所有的采样节点中执行闭包函数checkNode()来获取每个节点上能让出足够资源的最小Pod集合,然后经过类型转换为Candidate对象,并根据是否违反PDB做区分加入对应的列表中,如果没有成功返回Pod集合victims,就更新该节点的状态信息,最终返回合并后的节点列表和节点状态信息。

func (ev *Evaluator) DryRunPreemption(ctx context.Context, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
    pdbs []*policy.PodDisruptionBudget, offset int32, candidatesNum int32) ([]Candidate, *framework.NodeToStatus, error) {

    fh := ev.Handler
    // 初始化违反PDB和不违反PDB的候选节点列表
    nonViolatingCandidates := newCandidateList(candidatesNum)
    violatingCandidates := newCandidateList(candidatesNum)
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    nodeStatuses := framework.NewDefaultNodeToStatus()

    logger := klog.FromContext(ctx)
    logger.V(5).Info("Dry run the preemption", "potentialNodesNumber", len(potentialNodes), "pdbsNumber", len(pdbs), "offset", offset, "candidatesNumber", candidatesNum)

    var statusesLock sync.Mutex
    var errs []error
    // 节点检查函数
    checkNode := func(i int) {
        // 根据偏移量和索引拷贝节点信息
        nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Snapshot()
        logger.V(5).Info("Check the potential node for preemption", "node", nodeInfoCopy.Node().Name)
        // 拷贝状态信息
        stateCopy := state.Clone()
        // 核心方法 挑选被驱逐的Pod
        pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
        // 如果成功选到victims 先做类型转换 然后加入对应的列表
        if status.IsSuccess() && len(pods) != 0 {
            victims := extenderv1.Victims{
                Pods:             pods,
                NumPDBViolations: int64(numPDBViolations),
            }
            c := &candidate{
                victims: &victims,
                name:    nodeInfoCopy.Node().Name,
            }
            if numPDBViolations == 0 {
                nonViolatingCandidates.add(c)
            } else {
                violatingCandidates.add(c)
            }
            // 采样节点达到数量后停止计算
            nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
            if nvcSize > 0 && nvcSize+vcSize >= candidatesNum {
                cancel()
            }
            return
        }
        // 如果没有在节点上选到victims 更新节点状态记录
        if status.IsSuccess() && len(pods) == 0 {
            status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
        }
        statusesLock.Lock()
        if status.Code() == framework.Error {
            errs = append(errs, status.AsError())
        }
        nodeStatuses.Set(nodeInfoCopy.Node().Name, status)
        statusesLock.Unlock()
    }
    // 并行检查潜在节点
    fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode, ev.PluginName)
    // 合并返回不违反PDB与违反PDB的候选节点列表
    return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
}

在节点上选择被驱逐的Pod

并行器会在每个潜在的候选节点上执行checkNode()函数,其中的SelectVictimsOnNode()方法会在节点上找出能为抢占Pod让出足够资源的最小Pod集合。首先会初始化一个潜在受害者列表potentialVictims,然后遍历节点上的所有Pod,比较优先级把所有低于抢占者的Pod加入这个列表,并且临时移除这些Pod。此时已经没有更多的Pod可以被抢占驱逐了,执行在标准Filter流程中就使用的RunFilterPluginsWithNominatedPods()方法确认抢占者是否可以调度,如果仍然不可调度表示即使抢占也无法调度到该节点。如果可以调度,那么下一步就需要寻找最小的驱逐成本了,先根据是否违反PDB对这些低优先级的Pod进行分类,然后优先尝试恢复违反PDB的Pod,因为这类Pod更不希望收到影响。恢复Pod就是先在NodeInfo中添加Pod信息,然后执行RunFilterPluginsWithNominatedPods()方法验证Pod是否仍可以调度,如果添加后导致资源不足以让抢占者调度,那么添加这个Pod到victims列表,如果是违反PDB受害者的Pod,还需要对numViolatingVictim计数加一,该变量会返回给上层并作为评估标准之一。如果违反PDB和不违反PDB的节点都存在,需要对victims按优先级进行一次排序,最终返回受害者列表、违反PDB的受害者数量和成功状态。

SelectVictimsOnNode()方法作为抢占的核心逻辑之一,使用了最大删除-验证-逐步恢复的筛选策略,使用闭包函数减少了代码冗余;逐步恢复阶段分别处理两类受害者列表,优先保障了高优先级Pod和服务可用(PDB)。体现了调度器在复杂场景下的实用主义思想,在算法复杂度和执行效率之间寻找动态平衡点。

func (pl *DefaultPreemption) SelectVictimsOnNode(
    ctx context.Context,
    state *framework.CycleState,
    pod *v1.Pod,
    nodeInfo *framework.NodeInfo,
    pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
    logger := klog.FromContext(ctx)
    // 初始化潜在受害者列表
    var potentialVictims []*framework.PodInfo
    // 定义闭包函数 移除Pod
    removePod := func(rpi *framework.PodInfo) error {
        if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
            return err
        }
        // 预过滤扩展的执行保证Pod被安全移除/添加
        status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
        if !status.IsSuccess() {
            return status.AsError()
        }
        return nil
    }
    // 定义闭包函数 添加Pod
    addPod := func(api *framework.PodInfo) error {
        nodeInfo.AddPodInfo(api)
        status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
        if !status.IsSuccess() {
            return status.AsError()
        }
        return nil
    }
    // 在节点中找出所有优先级低于抢占者的Pod加入potentialVictims列表 并暂时从节点上移除它们
    podPriority := corev1helpers.PodPriority(pod)
    for _, pi := range nodeInfo.Pods {
        if corev1helpers.PodPriority(pi.Pod) < podPriority {
            potentialVictims = append(potentialVictims, pi)
            if err := removePod(pi); err != nil {
                return nil, 0, framework.AsStatus(err)
            }
        }
    }

    // 没有更低优先级的Pod了 返回状态原因
    if len(potentialVictims) == 0 {
        return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, "No preemption victims found for incoming pod")
    }

    // 确认假设所有低优先级的Pod都已经不存在 抢占Pod是否可以调度
    if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
        // 如果仍不可调度 直接返回
        return nil, 0, status
    }
    var victims []*v1.Pod
    numViolatingVictim := 0
    // 对所有潜在受害者按优先级由高到低排序
    sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })
    // 把潜在受害者按是否违反PDB分类
    violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
    // 定义闭包函数 尝试恢复被移除的Pod
    reprievePod := func(pi *framework.PodInfo) (bool, error) {
        // 先添加Pod
        if err := addPod(pi); err != nil {
            return false, err
        }
        // 添加之后确认抢占者是否仍然可调度
        status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
        fits := status.IsSuccess()
        // 如果恢复该Pod后导致了抢占者不可调度
        if !fits {
            // 把Pod重新删除
            if err := removePod(pi); err != nil {
                return false, err
            }
            rpi := pi.Pod
            // 添加Pod到victimes列表
            victims = append(victims, rpi)
            logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))
        }
        return fits, nil
    }
    // 遍历违反PDB的节点
    for _, p := range violatingVictims {
        // 优先级从高到低尝试逐步恢复Pod
        if fits, err := reprievePod(p); err != nil {
            return nil, 0, framework.AsStatus(err)
        } else if !fits {
            // 违反PDB的受害者计数累加
            numViolatingVictim++
        }
    }
    // 遍历不违反PDB的节点
    for _, p := range nonViolatingVictims {
        // 优先级从高到低尝试逐步恢复Pod
        if _, err := reprievePod(p); err != nil {
            return nil, 0, framework.AsStatus(err)
        }
    }

    // 因为先尝试恢复了违反PDB的Pod 如果两个队列都不为空 需要重新根据优先级排序
    if len(violatingVictims) != 0 && len(nonViolatingVictims) != 0 {
        sort.Slice(victims, func(i, j int) bool { return util.MoreImportantPod(victims[i], victims[j]) })
    }
    // 返回受害者列表、违反PDB的受害者数量、以及成功状态
    return victims, numViolatingVictim, framework.NewStatus(framework.Success)
}

3.3. 执行扩展器过滤

获取到候选节点和受害者后,会遍历执行注册的扩展器并执行其逻辑,把每个扩展器的输出作为下一个扩展器的输入,最终经过所有扩展器的过滤,返回了更小范围的候选节点列表。

HTTPExtender是通过HTTP接口调用外部扩展程序的机制,extender.ProcessPreemption()方法把抢占信息封装成HTTP请求发送给外部程序,接收到外部程序的响应后将其转换为内部数据形式并返回。

func (ev *Evaluator) callExtenders(logger klog.Logger, pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) {
    // 获取注册的扩展器
    extenders := ev.Handler.Extenders()
    nodeLister := ev.Handler.SnapshotSharedLister().NodeInfos()
    if len(extenders) == 0 {
        return candidates, nil
    }

    // []Candidate类型转换为map[string]*extenderv1.Victims
    victimsMap := ev.CandidatesToVictimsMap(candidates)
    if len(victimsMap) == 0 {
        return candidates, nil
    }
    // 遍历候选节点执行
    for _, extender := range extenders {
        if !extender.SupportsPreemption() || !extender.IsInterested(pod) {
            continue
        }
        nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
        if err != nil {
            if extender.IsIgnorable() {
                logger.Info("Skipped extender as it returned error and has ignorable flag set",
                    "extender", extender.Name(), "err", err)
                continue
            }
            return nil, framework.AsStatus(err)
        }
        // 校验返回结果
        for nodeName, victims := range nodeNameToVictims {
            // 如果是无效节点 返回错误
            if victims == nil || len(victims.Pods) == 0 {
                if extender.IsIgnorable() {
                    delete(nodeNameToVictims, nodeName)
                    logger.Info("Ignored node for which the extender didn't report victims", "node", klog.KRef("", nodeName), "extender", extender.Name())
                    continue
                }
                return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName))
            }
        }
        // 更新变量结果 传入下一个扩展器
        victimsMap = nodeNameToVictims

        if len(victimsMap) == 0 {
            break
        }
    }
    // 转换回[]Candidate类型并返回
    var newCandidates []Candidate
    for nodeName := range victimsMap {
        newCandidates = append(newCandidates, &candidate{
            victims: victimsMap[nodeName],
            name:    nodeName,
        })
    }
    return newCandidates, nil
}

3.4. 获取最优候选节点

这个阶段可以理解为标准流程中的Priorities,根据执行评分插件,选出最适合抢占的候选节点,其中的核心函数是pickOneNodeForPreemption(),作用可以类比标准流程的RunScorePlugins()方法。

func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate) Candidate {
    logger := klog.FromContext(ctx)

    if len(candidates) == 0 {
        return nil
    }
    // 只有一个候选节点直接返回 等同于len(feasibleNode)==1的场景
    if len(candidates) == 1 {
        return candidates[0]
    }
    // 数据类型转换
    victimsMap := ev.CandidatesToVictimsMap(candidates)
    // 获取评分函数列表
    scoreFuncs := ev.OrderedScoreFuncs(ctx, victimsMap)
    // 执行评分函数
    candidateNode := pickOneNodeForPreemption(logger, victimsMap, scoreFuncs)

    // 返回最有节点对象
    if victims := victimsMap[candidateNode]; victims != nil {
        return &candidate{
            victims: victims,
            name:    candidateNode,
        }
    }

    // 异常处理 不会执行到此处
    logger.Error(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates)
    // 避免流程结束
    return candidates[0]
}

pickOneNodeForPreemption()函数实现了节点的选择,根据对代码的分析,它的实现比Priorities要简单很多,不涉及节点的并行计算和归一化。使用贪心算法不断更新最高评分和最高分节点列表,并把当前评分函数的输出作为下一个评分函数的输入。值得一提的是,评分函数列表中的策略是存在优先级的,前一个评分函数的重要性永远高于后一个评分函数,只有当一个评分函数无法完全区分节点是否最优时,才会进入策略链的下一环继续评估。如果某一个评分函数只得到了一个最高分节点,那么就可以直接返回该节点了。

func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims, scoreFuncs []func(node string) int64) string {
    if len(nodesToVictims) == 0 {
        return ""
    }
    // 初始化候选节点列表
    allCandidates := make([]string, 0, len(nodesToVictims))
    for node := range nodesToVictims {
        allCandidates = append(allCandidates, node)
    }
    // 如果没有获取到评分函数 设置默认函数
    if len(scoreFuncs) == 0 {
        // 最少违反PDB
        minNumPDBViolatingScoreFunc := func(node string) int64 {
            // 违反PDB受害者数量越少评分越高
            return -nodesToVictims[node].NumPDBViolations
        }
        // 最高优先级受害者等级低
        minHighestPriorityScoreFunc := func(node string) int64 {
            // 受害者列表中第一个(最高优先级)元素的优先级越低评分越高
            highestPodPriority := corev1helpers.PodPriority(nodesToVictims[node].Pods[0])
            return -int64(highestPodPriority)
        }
        // 受害者优先级总和最小
        minSumPrioritiesScoreFunc := func(node string) int64 {
            var sumPriorities int64
            for _, pod := range nodesToVictims[node].Pods {
                // 受害者的优先级总和越小评分越高
                sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
            }
            return -sumPriorities
        }
        // 受害者数量最少
        minNumPodsScoreFunc := func(node string) int64 {
            // 受害者数量越少评分越高
            return -int64(len(nodesToVictims[node].Pods))
        }
        // 最高优先级受害者的启动时间最晚
        latestStartTimeScoreFunc := func(node string) int64 {
            // 最高优先级受害者的启动时间越晚评分越高
            earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
            if earliestStartTimeOnNode == nil {
                logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
                return int64(math.MinInt64)
            }
            return earliestStartTimeOnNode.UnixNano()
        }

        // 注册默认函数到scoreFuncs
        scoreFuncs = []func(string) int64{
            // A node with a minimum number of PDB is preferable.
            minNumPDBViolatingScoreFunc,
            // A node with a minimum highest priority victim is preferable.
            minHighestPriorityScoreFunc,
            // A node with the smallest sum of priorities is preferable.
            minSumPrioritiesScoreFunc,
            // A node with the minimum number of pods is preferable.
            minNumPodsScoreFunc,
            // A node with the latest start time of all highest priority victims is preferable.
            latestStartTimeScoreFunc,
            // If there are still ties, then the first Node in the list is selected.
        }
    }
    // 执行所有评分函数
    for _, f := range scoreFuncs {
        selectedNodes := []string{}
        maxScore := int64(math.MinInt64)
        // 遍历每个候选节点
        for _, node := range allCandidates {
            score := f(node)
            if score > maxScore {
                maxScore = score
                selectedNodes = []string{}
            }
            if score == maxScore {
                selectedNodes = append(selectedNodes, node)
            }
        }
        // 如果经过筛选只有最后一个候选节点直接返回
        if len(selectedNodes) == 1 {
            return selectedNodes[0]
        }
        // 更新节点列表
        allCandidates = selectedNodes
    }
    // 返回首个元素
    return allCandidates[0]
}

3.5. 候选节点提名前准备

确定了候选节点和受害者以后,就需要把这些倒霉的Pod从节点上驱逐出去,当前Kubernetes支持同步抢占和异步抢占两种方式。调度器根据特性门控EnableAsyncPreemption的开关决定使用同步或异步。

if ev.enableAsyncPreemption {
    // 异步抢占
    ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName)
} else {
    // 同步抢占
    if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
        return nil, status
    }
}

同步抢占

同步抢占通过prepareCandidate()方法实现,通过并行器驱逐所有受害者,然后检查是否存在比抢占者优先级低但是提名了同一个节点的Pod,如果存在就清除它们的NominatedNodeName字段。

func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
    // 初始化对象
    fh := ev.Handler
    cs := ev.Handler.ClientSet()

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    logger := klog.FromContext(ctx)
    errCh := parallelize.NewErrorChannel()
    // 并行驱逐所有受害者
    fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
        if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
            errCh.SendErrorWithCancel(err, cancel)
        }
    }, ev.PluginName)
    if err := errCh.ReceiveError(); err != nil {
        return framework.AsStatus(err)
    }
    // 记录监控指标
    metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))

    // 是否有低于抢占者优先级的Pod提名到同一节点 清除NominatedNodeName字段
    nominatedPods := getLowerPriorityNominatedPods(logger, fh, pod, c.Name())
    if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
        // 如果清除NominatedNodeName字段失败不会影响流程
        logger.Error(err, "Cannot clear 'NominatedNodeName' field")
    }

    return nil
}

上面有提到PreemptPod()方法是在创建评估器时注册的,具体实现如下。首先判断受害者Pod是否在等待准入插件返回结果,如果正处于Waiting状态则直接调用Reject()方法终止其调度。如果不是Waiting状态先构造一个新的状态信息,然后先进行本地更新,如果本地更新成功才会调用API接口更新API Server中受害者Pod的状态信息并删除实例对象。

PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
    logger := klog.FromContext(ctx)
    // 检查受害者Pod是否为WaitingPod
    if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil {
        // 如果是WaitingPod 直接调用Reject方法
        waitingPod.Reject(pluginName, "preempted")
        logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
    } else {
        // 受害者Pod不是WaitingPod
        // 构造PodCondition信息
        condition := &v1.PodCondition{
            Type:    v1.DisruptionTarget,
            Status:  v1.ConditionTrue,
            Reason:  v1.PodReasonPreemptionByScheduler,
            Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName),
        }
        newStatus := victim.Status.DeepCopy()
        // 本地更新Pod状态
        updated := apipod.UpdatePodCondition(newStatus, condition)
        if updated {
            // 本地状态成功更新后调用Api对Pod状态进行Patch
            if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil {
                logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
                return err
            }
        }
        // 删除Pod
        if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
            if !apierrors.IsNotFound(err) {
                logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
                return err
            }
            logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
            return nil
        }
        logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
    }

    ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name())

    return nil
}

异步抢占

异步抢占能够避免阻塞调度主流程,可以提高调度器的吞吐量和响应速度。异步抢占的核心逻辑和同步抢占基本相同,但是异步抢占会额外维护评估器中的preempting集合,在其中记录正处于抢占状态的Pod,并通过加锁/解锁避免并发问题。在驱逐受害者Pod时,同步抢占直接并行驱逐所有,而异步抢占的驱逐动作通过协程启动,首先并行驱逐N-1个受害者Pod,驱逐成功后删除preempting集合中的对象,并单独驱逐最后一个受害者Pod。

func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) {
    metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))

    // 初始化对象
    ctx, cancel := context.WithCancel(context.Background())
    errCh := parallelize.NewErrorChannel()
    // 和同步抢占相同
    preemptPod := func(index int) {
        victim := c.Victims().Pods[index]
        if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
            errCh.SendErrorWithCancel(err, cancel)
        }
    }
    // 评估器的preempting集合保存正在抢占的Pod
    ev.mu.Lock()
    ev.preempting.Insert(pod.UID)
    ev.mu.Unlock()

    logger := klog.FromContext(ctx)
    // 启动协程执行
    go func() {
        startTime := time.Now()
        result := metrics.GoroutineResultSuccess
        defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
        defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
        defer func() {
            if result == metrics.GoroutineResultError {
                // 如果最终结果失败 把Pod放回到activeQ
                ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
            }
        }()
        defer cancel()
        logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))

        // 是否有低于抢占者优先级的Pod提名到同一节点 清除NominatedNodeName字段
        nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name())
        if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil {
            // 如果清除NominatedNodeName字段失败不会影响流程
            logger.Error(err, "Cannot clear 'NominatedNodeName' field from lower priority pods on the same target node", "node", c.Name())
            result = metrics.GoroutineResultError
        }
        // 如果没有受害者Pod需要驱逐 从评估器的preempting集合中删除当前Pod并返回
        if len(c.Victims().Pods) == 0 {
            ev.mu.Lock()
            delete(ev.preempting, pod.UID)
            ev.mu.Unlock()

            return
        }
        // 并行删除N-1个受害者Pod
        ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
        if err := errCh.ReceiveError(); err != nil {
            logger.Error(err, "Error occurred during async preemption")
            result = metrics.GoroutineResultError
        }
        // 从评估器的preempting集合中删除当前Pod
        ev.mu.Lock()
        delete(ev.preempting, pod.UID)
        ev.mu.Unlock()
        // 删除最后一个受害者Pod
        if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
            logger.Error(err, "Error occurred during async preemption")
            result = metrics.GoroutineResultError
        }

        logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
    }()
}

3.6. 抢占的结束

在成功驱逐了所有受害者Pod后,返回PostFilterResult类型的抢占结果,在其中包含了提名节点的名称。组装错误信息并返回,这一个调度周期就结束了,在后续的错误处理环节,如果发现结果的nominatingInfo字段不为空,则修改Pod的Status.NominatedNodeName字段为nominatingInfo的值,Pod状态设置为Unschedulable状态,在后面的调度周期会重新入队并尝试调度到提名节点。

func (sched *Scheduler) schedulingCycle(
    ctx context.Context,
    state *framework.CycleState,
    fwk framework.Framework,
    podInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
    logger := klog.FromContext(ctx)
    pod := podInfo.Pod
    scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
    if err != nil {
        ...... 
        // 抢占流程
        result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatus)
        msg := status.Message()
        fitError.Diagnosis.PostFilterMsg = msg
        if status.Code() == framework.Error {
            logger.Error(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
        } else {
            logger.V(5).Info("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
        }

        var nominatingInfo *framework.NominatingInfo
        if result != nil {
            nominatingInfo = result.NominatingInfo
        }
        // 返回抢占结果和Unschedulable状态
        return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
    }
    ......
}
func (sched *Scheduler) ScheduleOne(ctx context.Context) {
    ......
    scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
    if !status.IsSuccess() {
        // 进行错误处理
        sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
        return
    }
    ......
}

4. 调度的失败处理

ScheduleOne()方法中可以看到调度器在两个位置进行了失败处理,不难想到这两处就是SchedulingCycleBindingCycle的结尾,在两个生命周期结束时进行错误判断与处理。

func (sched *Scheduler) ScheduleOne(ctx context.Context) {
    ......

    scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
    if !status.IsSuccess() {
        sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
        return
    }

    go func() {
        ......

        status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
        if !status.IsSuccess() {
            sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
            return
        }
    }()
}

失败处理接口FailureHandlerFailureHandlerFn类型的函数,在调度器创建时的applyDefaultHandlers()方法设置。

func (sched *Scheduler) applyDefaultHandlers() {
    sched.SchedulePod = sched.schedulePod
    sched.FailureHandler = sched.handleSchedulingFailure
}

下面来详细分析handleSchedulingFailure()方法内的逻辑。首先来看函数签名,该方法接收六个参数,分别是上下文信息ctx,调度配置fwk,Pod信息podInfo,返回状态status,提名信息nominatingInfo和调度起始时间start。整体上包括调度事件记录、Pod提名节点信息处理、Pod对象重新入队和Pod状态更新。

func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
    calledDone := false
    defer func() {
        if !calledDone {
            // 一般情况下AddUnschedulableIfNotPresent内部会调用SchedulingQueue.Done(pod.UID) 
            // 避免没有调用的特殊情况 正确释放Pod资源
            sched.SchedulingQueue.Done(podInfo.Pod.UID)
        }
    }()

    logger := klog.FromContext(ctx)
    // 初始化错误原因
    reason := v1.PodReasonSchedulerError
    if status.IsRejected() {
        // 如果状态是被拒绝表示不可调度
        reason = v1.PodReasonUnschedulable
    }
    // 记录指标
    switch reason {
    case v1.PodReasonUnschedulable:
        metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
    case v1.PodReasonSchedulerError:
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    }

    // 获取失败信息
    pod := podInfo.Pod
    err := status.AsError()
    errMsg := status.Message()

    if err == ErrNoNodesAvailable {
        // 集群中没有Node注册
        logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
    } else if fitError, ok := err.(*framework.FitError); ok {
        // 不符合条件被调度插件拒绝
        // 记录UnschedulablePlugins和PendingPlugins
        podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
        podInfo.PendingPlugins = fitError.Diagnosis.PendingPlugins
        logger.V(2).Info("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", errMsg)
    } else {
        // 其他内部错误
        logger.Error(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
    }

    // 使用Lister获取最新信息并在其中查找当前Pod
    podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
    cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
    if e != nil {
        logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
    } else {
        // 检查是否有NodeName信息
        if len(cachedPod.Spec.NodeName) != 0 {
            logger.Info("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
        } else {
            // 没有NodeName信息 把Pod深拷贝一份后重新入队
            podInfo.PodInfo, _ = framework.NewPodInfo(cachedPod.DeepCopy())
            if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil {
                logger.Error(err, "Error occurred")
            }
            calledDone = true
        }
    }

    // 尝试添加带有提名节点信息的Pod到Nominator
    if sched.SchedulingQueue != nil {
        sched.SchedulingQueue.AddNominatedPod(logger, podInfo.PodInfo, nominatingInfo)
    }

    if err == nil {
        return
    }
    // 记录事件
    msg := truncateMessage(errMsg)
    fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
    // 更新Pod状态 包括提名节点信息和Condition
    if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
        Type:    v1.PodScheduled,
        Status:  v1.ConditionFalse,
        Reason:  reason,
        Message: errMsg,
    }, nominatingInfo); err != nil {
        logger.Error(err, "Error updating pod", "pod", klog.KObj(pod))
    }
}
© 2025 lts0609. all right reserved,powered by Gitbook最后更新时间: 2025-07-29 16:04:37

results matching ""

    No results matching ""