ReplicaSetController原理详解

1. 实例创建

实例创建的部分属于通用基本逻辑,创建PodReplicaSet对象的Informer,GVK标识为{Group: apps, Version: v1, Kind: ReplicaSet}。

// 创建ReplicaSetController的Descriptor
func newReplicaSetControllerDescriptor() *ControllerDescriptor {
    return &ControllerDescriptor{
        name:     names.ReplicaSetController,
        aliases:  []string{"replicaset"},
        initFunc: startReplicaSetController,
    }
}

// ReplicaSetController的初始化函数
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
    go replicaset.NewReplicaSetController(
        ctx,
        controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
        controllerContext.InformerFactory.Core().V1().Pods(),
        controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
    return nil, true, nil
}

// 创建ReplicaSetController实例
func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    logger := klog.FromContext(ctx)
    eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    if err := metrics.Register(legacyregistry.Register); err != nil {
        logger.Error(err, "unable to register metrics")
    }
    return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
        apps.SchemeGroupVersion.WithKind("ReplicaSet"),
        "replicaset_controller",
        "replicaset",
        controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
        },
        eventBroadcaster,
    )
}

// 启动ReplicaSetController
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
    defer utilruntime.HandleCrash()

    // Start events processing pipeline.
    rsc.eventBroadcaster.StartStructuredLogging(3)
    rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
    defer rsc.eventBroadcaster.Shutdown()

    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    logger := klog.FromContext(ctx)
    logger.Info("Starting controller", "name", controllerName)
    defer logger.Info("Shutting down controller", "name", controllerName)

    if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        // 每秒循环执行
        go wait.UntilWithContext(ctx, rsc.worker, time.Second)
    }

    <-ctx.Done()
}

1.1. 启动

这部分代码是标准化的,和其他所有控制器都相同,syncHandler()方法在创建实例的NewBaseController()函数中注册,实际为syncReplicaSet()方法。

func (rsc *ReplicaSetController) worker(ctx context.Context) {
    for rsc.processNextWorkItem(ctx) {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)
    // 执行调谐
    err := rsc.syncHandler(ctx, key)
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

2. 控制器调谐逻辑

Informer监测到变化时object对象是以key为namespace/name的形式加入工作队列的,调谐逻辑处理时要通过key再把命名空间和对象名称分离。

func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
    logger := klog.FromContext(ctx)
    startTime := time.Now()
    defer func() {
        logger.Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
    }()
    // 分割获取命名空间和名称
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    // 根据信息获取RS对象
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if apierrors.IsNotFound(err) {
        logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key)
        rsc.expectations.DeleteExpectations(logger, key)
        return nil
    }
    if err != nil {
        return err
    }
    // 检查期望状态是否已经达成
    rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
    // 获取对象的标签选择器
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
        return nil
    }

    // 获取命名空间下的所有Pod
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
        return err
    }
    // 过滤出Running的Pod
    filteredPods := controller.FilterActivePods(logger, allPods)
    // 在此过滤出由当前控制器管理的Pod
    filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
    if err != nil {
        return err
    }

    var manageReplicasErr error
    // 需要进行副本管理操作
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
    }
    // 更新ReplicaSet对象状态
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    // Always updates status as pods come up or die.
    updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        return err
    }
    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
        rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}

2.1. 期望副本状态判断

SatisfiedExpectations()方法用来检查对象的当前状态和期望状态是否需要进行同步,用于ReplicaSetControllerDaemonSetControllerJobController这几个存在副本期望的控制器中。

func (r *ControllerExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool {
    if exp, exists, err := r.GetExpectations(controllerKey); exists {
        // 检查期望数量是否满足
        if exp.Fulfilled() {
            logger.V(4).Info("Controller expectations fulfilled", "expectations", exp)
            return true
        // 检查期望是否过期
        } else if exp.isExpired() {
            logger.V(4).Info("Controller expectations expired", "expectations", exp)
            return true
        } else {
            logger.V(4).Info("Controller still waiting on expectations", "expectations", exp)
            return false
        }
    } else if err != nil {
        logger.V(2).Info("Error encountered while checking expectations, forcing sync", "err", err)
    } else {
        logger.V(4).Info("Controller either never recorded expectations, or the ttl expired", "controller", controllerKey)
    }
    return true
}

func (e *ControlleeExpectations) Fulfilled() bool {
    // 期望创建和删除的副本数都不大于0表示满足
    return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

期望状态的检查发生在每次调谐的开始,如果SatisfiedExpectations()方法返回了false,那就意味着manageReplicas()方法不会被执行,会导致该对象在控制器中逻辑被阻塞。

2.2. 过滤控制器管理Pod

首先通过PodLister获取命名空间下的所有Pod,然后调用claimPods()方法进行处理。该方法会先创建一个PodControllerRefManager对象,也就是说在每次调谐时都会给目标ReplicaSet创建一个PodControllerRefManager用来认领Pod,它作为局部变量生命周期随claimPods()方法一同结束。

func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
    // Pod认领判断函数
    canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
        fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{})
        if err != nil {
            return nil, err
        }
        if fresh.UID != rs.UID {
            return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
        }
        return fresh, nil
    })
    // 创建PodControllerRefManager
    cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
    // 处理Pod
    return cm.ClaimPods(ctx, filteredPods)
}

对象创建后只有一个任务,调用ClaimPods()方法过滤属于这个ReplicaSet管理的Pod。在这个过程中要匹配Pod标签和ReplicaSet的标签选择器,所以输入参数除了上下文信息只传入了Pod列表,实际上还可以根据需要传递过滤函数filters。其中定义了matchadoptrelease三种方法用于在对应的情况下处理Pod。

func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
    var claimed []*v1.Pod
    var errlist []error
    // 筛选函数
    match := func(obj metav1.Object) bool {
        pod := obj.(*v1.Pod)
        // 匹配标签选择器
        if !m.Selector.Matches(labels.Set(pod.Labels)) {
            return false
        }
        // 匹配自定义过滤函数
        for _, filter := range filters {
            if !filter(pod) {
                return false
            }
        }
        return true
    }
    // 认领函数
    adopt := func(ctx context.Context, obj metav1.Object) error {
        return m.AdoptPod(ctx, obj.(*v1.Pod))
    }
    // 释放函数
    release := func(ctx context.Context, obj metav1.Object) error {
        return m.ReleasePod(ctx, obj.(*v1.Pod))
    }
    // 遍历Pod列表
    for _, pod := range pods {
        ok, err := m.ClaimObject(ctx, pod, match, adopt, release)
        if err != nil {
            errlist = append(errlist, err)
            continue
        }
        // 收集认领成功的Pod
        if ok {
            claimed = append(claimed, pod)
        }
    }
    return claimed, utilerrors.NewAggregate(errlist)
}

处理单个Pod的具体动作由ClaimObject()方法完成,首先检查Pod是否存在OwnerReference,如果不存在那么当前它是一个孤儿对象,检查Pod是否正处于删除状态以及它的命名空间和当前控制器的是否相同,然后尝试认领该Pod;如果存在,检查OwnerReferenceUID和当前控制器是否相同,UID如果相同还需要确认标签选择器是否满足,然后返回结果。

func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt, release func(context.Context, metav1.Object) error) (bool, error) {
    // 获取Pod的OwnerReference对象
    controllerRef := metav1.GetControllerOfNoCopy(obj)
    // OwnerReference存在
    if controllerRef != nil {
        // 检查OwnerReference的UID和当前控制器是否相同
        if controllerRef.UID != m.Controller.GetUID() {
            // 属于其他控制器管理
            return false, nil
        }
        if match(obj) {
            // 只要满足条件就返回成功
            return true, nil
        }
        // OwnerReference的UID相同但不满足标签选择器的条件
        // 检查是否正在被删除
        if m.Controller.GetDeletionTimestamp() != nil {
            return false, nil
        }
        // 如果没有在删除中 调用release方法释放Pod
        if err := release(ctx, obj); err != nil {
            // If the pod no longer exists, ignore the error.
            if errors.IsNotFound(err) {
                return false, nil
            }
            // Either someone else released it, or there was a transient error.
            // The controller should requeue and try again if it's still stale.
            return false, err
        }
        // 释放成功返回false
        return false, nil
    }

    // OwnerReference不存在 孤儿对象处理分支
    if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
        // Ignore if we're being deleted or selector doesn't match.
        return false, nil
    }
    // 是否正在删除中
    if obj.GetDeletionTimestamp() != nil {
        return false, nil
    }
    // 命名空间检查
    if len(m.Controller.GetNamespace()) > 0 && m.Controller.GetNamespace() != obj.GetNamespace() {
        return false, nil
    }

    // 调用adopt方法认领Pod
    if err := adopt(ctx, obj); err != nil {
        // If the pod no longer exists, ignore the error.
        if errors.IsNotFound(err) {
            return false, nil
        }
        // Either someone else claimed it first, or there was a transient error.
        // The controller should requeue and try again if it's still orphaned.
        return false, err
    }
    // 认领成功返回true
    return true, nil
}

3. 副本管理(核心逻辑)

如果当前状态和期望状态不一致,就会调用manageReplicas()方法使副本状态趋于期望。根据函数签名,输入参数包括上下文信息、控制器管理的Pod列表filteredPodsReplicaSet对象rs

func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    // 计算期望副本数和实际副本数的差值
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    // 经过KeyFunc获取ReplicaSet对象的obj key
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
        return nil
    }
    logger := klog.FromContext(ctx)
    // 实际副本数小于期望副本数
    if diff < 0 {
        diff *= -1
        // 限制变化数量上限
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        // 记录预期创建数量 与Informer协同作用
        rsc.expectations.ExpectCreations(logger, rsKey, diff)
        logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
        // 创建期望数量的Pod
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil {
                if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                    return nil
                }
            }
            return err
        })
        // 存在Pod创建失败
        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            logger.V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
            for i := 0; i < skippedPods; i++ {
                // 修改期望创建数量
                rsc.expectations.CreationObserved(logger, rsKey)
            }
        }
        return err
    } else if diff > 0 {
        // 实际副本数大于期望副本数
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        logger.V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
        // 获取关联的Pod
        relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
        utilruntime.HandleError(err)
        // 获取要删除的Pod
        podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
        // 设置预期删除数量
        rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))

        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        // 并发删除Pod
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(targetPod)
                    rsc.expectations.DeletionObserved(logger, rsKey, podKey)
                    if !apierrors.IsNotFound(err) {
                        logger.V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
                        errCh <- err
                    }
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

3.1. 需要创建副本

慢启动批量创建

使用slowStartBatch()函数批量创建Pod,函数名为慢启动批处理,用于控制并发的速率,避免一次性发起过多请求造成的系统压力。接收处理总数count、初始批处理大小initialBatchSize和逻辑函数fn。循环执行输入的逻辑函数,初始批处理大小为1,通过channel控制并发,每一轮循环后更新计数和批处理大小,最后返回成功数量。

func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
    // 剩余处理次数
    remaining := count
    successes := 0
    // 循环执行逻辑 批处理数量逐步增加
    for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(2*batchSize, remaining) {
        errCh := make(chan error, batchSize)
        var wg sync.WaitGroup
        // 并发控制
        wg.Add(batchSize)
        for i := 0; i < batchSize; i++ {
            go func() {
                defer wg.Done()
                if err := fn(); err != nil {
                    errCh <- err
                }
            }()
        }
        wg.Wait()
        // 更新计数
        curSuccesses := batchSize - len(errCh)
        successes += curSuccesses
        // 有失败事件直接返回
        if len(errCh) > 0 {
            return successes, <-errCh
        }
        remaining -= batchSize
    }
    return successes, nil
}

创建失败处理和EventHandler

在上面的流程中曾调用expectations.ExpectCreations()方法设置期望创建/删除副本数量,期望值expectations充当缓冲计数器,并且会传递到后面的周期,控制器在启动时的SatisfiedExpectations()方法就是对期望值进行检查,为了保证核心逻辑的顺利执行,会期望每次检查时的ControlleeExpectations.addControlleeExpectations.del都不大于0。这依赖PodInformerCreationObserved()的协同处理,在注册的EventHandler中,每观测到创建了一个属于该ReplicaSet对象的Pod副本,就会调用CreationObserved()方法使计数器的add字段值减一,也就是说如果所有的创建操作都执行成功,那下一次的期望检查就会通过,然后重新在manageReplicas()方法中计算差值并进行创建/删除。当创建过程中发生错误,那么就会调用CreationObserved()方法,执行期望扩容数-成功扩容数的次数,最终使计数器清零。

func (r *ControllerExpectations) CreationObserved(logger klog.Logger, controllerKey string) {
    r.LowerExpectations(logger, controllerKey, 1, 0)
}

func (r *ControllerExpectations) LowerExpectations(logger klog.Logger, controllerKey string, add, del int) {
    if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
        exp.Add(int64(-add), int64(-del))
        // The expectations might've been modified since the update on the previous line.
        logger.V(4).Info("Lowered expectations", "expectations", exp)
    }
}

3.2. 需要缩容副本

如果实际副本数大于期望副本数,就需要对Pod副本进行缩容。首先设置处理数量,然后找出和当前ReplicaSet关联的Pod,再从中选出要删除的Pod副本,最后通过goroutinechannel实现删除的并发控制。并持续读监听错误通道,如果出现错误数据就中断操作。

func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    ......
    } else if diff > 0 {
        // 数量限制
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        logger.V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
        // 获取关联的Pod
        relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
        utilruntime.HandleError(err)
        // 选择要删除的Pod
        podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
        // 设置预期删除数量
        rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))

        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        // 并发删除Pod
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
                    podKey := controller.PodKey(targetPod)
                    rsc.expectations.DeletionObserved(logger, rsKey, podKey)
                    if !apierrors.IsNotFound(err) {
                        logger.V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
                        errCh <- err
                    }
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // 如果出现错误中断删除操作
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

获取间接关联副本

getIndirectlyRelatedPods()方法用来找到有间接从属关系的Pod。先初始化一个列表,通过getReplicaSetsWithSameController()方法获取和当前ReplicaSet有相同上层控制器的所有ReplicaSet对象,因为如Deployment在滚动更新过程中可能会同时管理多个版本的ReplicaSet对象,在管理副本时需要考虑这些场景。

func (rsc *ReplicaSetController) getReplicaSetsWithSameController(logger klog.Logger, rs *apps.ReplicaSet) []*apps.ReplicaSet {
    // 获取OwnerReference
    controllerRef := metav1.GetControllerOf(rs)
    if controllerRef == nil {
        utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
        return nil
    }
    // 根据OwnerReference.UID在rsIndexer中查找对象
    objects, err := rsc.rsIndexer.ByIndex(controllerUIDIndex, string(controllerRef.UID))
    if err != nil {
        utilruntime.HandleError(err)
        return nil
    }
    // 添加ReplicaSet对象到列表并返回
    relatedRSs := make([]*apps.ReplicaSet, 0, len(objects))
    for _, obj := range objects {
        relatedRSs = append(relatedRSs, obj.(*apps.ReplicaSet))
    }

    if klogV := logger.V(2); klogV.Enabled() {
        klogV.Info("Found related ReplicaSets", "replicaSet", klog.KObj(rs), "relatedReplicaSets", klog.KObjSlice(relatedRSs))
    }

    return relatedRSs
}

ReplicaSet对象获取后,开始处理每个对象中管理的Pod副本,遍历所有ReplicaSet,通过标签选择器获取标签匹配的Pod,然后添加到关联Pod列表,如果一个Pod被多个ReplicaSet管理就跳过处理,仅用日志记录该Pod同时属于多个ReplicaSet,最后把和当前ReplicaSet对象有关的Pod列表返回给上层。

relatedPods是一个扩展的副本集合,不仅有满足当前ReplicaSet对象标签选择器的Pod副本,还包含通过兄弟ReplicaSet(与当前ReplicaSet有相同的上层控制器)标签选择器的Pod副本。

func (rsc *ReplicaSetController) getIndirectlyRelatedPods(logger klog.Logger, rs *apps.ReplicaSet) ([]*v1.Pod, error) {
    // 初始化Pod集合
    var relatedPods []*v1.Pod
    seen := make(map[types.UID]*apps.ReplicaSet)
    // 遍历所有相同上层控制器的ReplicaSet对象
    for _, relatedRS := range rsc.getReplicaSetsWithSameController(logger, rs) {
        // 获取ReplicaSet的标签选择器
        selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
        if err != nil {
            continue
        }
        // 通过PodLIster获取满足标签选择器条件的Pod列表
        pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)
        if err != nil {
            return nil, err
        }
        // 遍历Pod列表
        for _, pod := range pods {
            // 如果映射已经存在 记录日志并跳过处理
            if otherRS, found := seen[pod.UID]; found {
                logger.V(5).Info("Pod is owned by both", "pod", klog.KObj(pod), "kind", rsc.Kind, "replicaSets", klog.KObjSlice([]klog.KMetadata{otherRS, relatedRS}))
                continue
            }
            // 添加Pod-ReplicaSet映射
            seen[pod.UID] = relatedRS
            relatedPods = append(relatedPods, pod)
        }
    }
    logger.V(4).Info("Found related pods", "kind", rsc.Kind, "replicaSet", klog.KObj(rs), "pods", klog.KObjSlice(relatedPods))
    return relatedPods, nil
}

确认要删除的副本

如果差值大于当前ReplicaSet管理的副本数,结果就是全部删除。如果差值小于副本数,根据节点上的副本分布情况构造一个ActivePodsWithRanks类型的对象,进行排序后截取diff长度的数组返回。

func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
    // 期望删除数量小于当前副本数
    if diff < len(filteredPods) {
        // 构造ActivePodsWithRanks类型对象
        podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
        // 对ActivePodsWithRanks进行排序处理 内部的filteredPods顺序被修改
        sort.Sort(podsWithRanks)
        // 指标上报
        reportSortingDeletionAgeRatioMetric(filteredPods, diff)
    }
    // 返回diff长度的Pod列表
    return filteredPods[:diff]
}

getPodsRankedByRelatedPodsOnSameNode()函数的输入参数包括当前ReplicaSet管理的待排序副本列表podsToRank和所有满足标签选择器的副本列表relatedPods,首先遍历relatedPods计算关联副本在节点上的分布情况,然后遍历podsToRank并填充rank数组,组装成ActivePodsWithRanks对象后返回,ActivePodsWithRanks中定义了Swap()Less()方法,执行Sort()排序时传入的Pods列表由于是引用传递,底层数组会被直接修改。

func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
    // 初始化变量
    podsOnNode := make(map[string]int)
    for _, pod := range relatedPods {
        if controller.IsPodActive(pod) {
            // 对每个节点上存在的关联Pod计数
            podsOnNode[pod.Spec.NodeName]++
        }
    }
    ranks := make([]int, len(podsToRank))
    for i, pod := range podsToRank {
        // 把计数信息按顺序加载到列表
        ranks[i] = podsOnNode[pod.Spec.NodeName]
    }
    return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
}
© 2025 lts0609. all right reserved,powered by Gitbook最后更新时间: 2025-07-29 16:04:37

results matching ""

    No results matching ""