侧边栏壁纸
博主头像
lance

不为失败找借口,只为成功找方法。

  • 累计撰写 28 篇文章
  • 累计创建 0 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Koordinator核心调度算法.md

lance
2025-03-19 / 0 评论 / 0 点赞 / 219 阅读 / 3,626 字
温馨提示:
本文最后更新于 2025-05-04,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Koordinator核心调度算法

基于 Node 节点的真实 CPU 与内存使用率来进行 Pod 调度,而不是 Pod 的请求资源。

负载感知调度

场景

  1. 当节点的资源利用率达到高阈值时,节点上正在运行的工作负载之间会发生严重的资源争用;
  2. 集群中的工作负载具有不同的资源需求,有的工作负载对 CPU 资源的利用率要求比较高,而有的工作负载对内存资源的利用率要求比较高;
  3. 由于节点异常,在调度过程中应避免此类节点,以防止出现意外异常;

目标

  1. 提供可配置的调度插件来帮助控制集群资源利用率;
  2. 资源利用率控制机制支持多种资源;
  3. 将资源利用率控制在安全阈值;

设计

img

调度流程:

  • 控制器把将要创建的pod同步到api server中。
  • kubelet从koordlet拉取pod信息,并上报给APIserver中。
  • koordlet将Node和Pods的资源使用情况报告NodeMetric。
  • 调度新的Pod通过watch/list的机制监听到pod中字段nodename为空的进行调度。
  • APIserver推送NodeMetric给调度器。
  • 调度器通过扩展插件进行过滤,打分,保留等操作后,调度pod后进行绑定。

img

调度插件扩展了 Kubernetes 调度框架中定义的 Filter/Score/Reserve/Unreserve 扩展点。

评分算法的核心逻辑是选择资源使用量最小的节点,但是考虑到资源使用上报的延迟和 Pod 启动时间的延迟,

时间窗口内已经调度的 Pod 和当前正在调度的 Pod 的资源请求也会被估算出来,并且估算值将参与计算。

源码

负载感知调度的启动参数如下所示:

// LoadAwareSchedulingArgs holds arguments used to configure the LoadAwareScheduling plugin.
type LoadAwareSchedulingArgs struct {
	metav1.TypeMeta

	// FilterExpiredNodeMetrics indicates whether to filter nodes where koordlet fails to update NodeMetric.
	// Deprecated: NodeMetric should always be checked for expiration.
	FilterExpiredNodeMetrics *bool
	// NodeMetricExpirationSeconds indicates the NodeMetric expiration in seconds.
	// When NodeMetrics expired, the node is considered abnormal.
	// Default is 180 seconds.
	NodeMetricExpirationSeconds *int64
	// ResourceWeights indicates the weights of resources.
	// The weights of CPU and Memory are both 1 by default.
	ResourceWeights map[corev1.ResourceName]int64
	// UsageThresholds indicates the resource utilization threshold of the whole machine.
	// The default for CPU is 65%, and the default for memory is 95%.
	UsageThresholds map[corev1.ResourceName]int64
	// ProdUsageThresholds indicates the resource utilization threshold of Prod Pods compared to the whole machine.
	// Not enabled by default
	ProdUsageThresholds map[corev1.ResourceName]int64
	// ScoreAccordingProdUsage controls whether to score according to the utilization of Prod Pod
	ScoreAccordingProdUsage bool
	// Estimator indicates the expected Estimator to use
	Estimator string
	// EstimatedScalingFactors indicates the factor when estimating resource usage.
	// The default value of CPU is 85%, and the default value of Memory is 70%.
	EstimatedScalingFactors map[corev1.ResourceName]int64
	// Aggregated supports resource utilization filtering and scoring based on percentile statistics
	Aggregated *LoadAwareSchedulingAggregatedArgs
}

// ScoringStrategyType is a "string" type.
type ScoringStrategyType string

const (
	// MostAllocated strategy favors node with the least amount of available resource
	MostAllocated ScoringStrategyType = "MostAllocated"
	// BalancedAllocation strategy favors nodes with balanced resource usage rate
	BalancedAllocation ScoringStrategyType = "BalancedAllocation"
	// LeastAllocated strategy favors node with the most amount of available resource
	LeastAllocated ScoringStrategyType = "LeastAllocated"
)
  • FilterExpiredNodeMetrics 指定是否过滤 Koordlet 无法更新 NodeMetric 的节点;
  • NodeMetricExpirationSeconds 表示 NodeMetric 过期时间,单位为秒;当NodeMetric过期时,节点被认为异常,默认为180秒;
  • ResourceWeights 表示资源的权重,默认情况下,CPU 和内存的权重都为1;
  • UsageThresholds 表示资源利用率阈值,CPU 的默认值为65%,内存的默认值为95%。
  • EstimatedScalingFactors 表示估计资源使用情况时的系数,CPU 的默认值为85%,内存的默认值为70%。

负载感知调度初始化如下所示:

type Plugin struct {
	handle           framework.Handle
	args             *config.LoadAwareSchedulingArgs
    // Pod 元数据
	podLister        corev1listers.PodLister
    // Node Metrics 指标元数据
	nodeMetricLister slolisters.NodeMetricLister    
	estimator        estimator.Estimator
    // Pod 缓存
	podAssignCache   *podAssignCache
}

func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {
	pluginArgs, ok := args.(*config.LoadAwareSchedulingArgs)
	if !ok {
		return nil, fmt.Errorf("want args to be of type LoadAwareSchedulingArgs, got %T", args)
	}

	if err := validation.ValidateLoadAwareSchedulingArgs(pluginArgs); err != nil {
		return nil, err
	}

	frameworkExtender, ok := handle.(frameworkext.ExtendedHandle)
	if !ok {
		return nil, fmt.Errorf("want handle to be of type frameworkext.ExtendedHandle, got %T", handle)
	}

    // Pod 缓存
	assignCache := newPodAssignCache()
    
    // Pod List-Watch 监听机制
	podInformer := frameworkExtender.SharedInformerFactory().Core().V1().Pods()
	frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), frameworkExtender.SharedInformerFactory(), podInformer.Informer(), assignCache)
	podLister := podInformer.Lister()
    // Node 指标元数据
	nodeMetricLister := frameworkExtender.KoordinatorSharedInformerFactory().Slo().V1alpha1().NodeMetrics().Lister()

	estimator, err := estimator.NewEstimator(pluginArgs, handle)
	if err != nil {
		return nil, err
	}

	return &Plugin{
		handle:           handle,
		args:             pluginArgs,
		podLister:        podLister,
		nodeMetricLister: nodeMetricLister,
		estimator:        estimator,
		podAssignCache:   assignCache,
	}, nil
}

基于 scheduler framework 调度框架拓展了 FilterPlugin、ScorePlugin、ReservePlugin 三个插件,如下所示:

var (
	_ framework.EnqueueExtensions = &Plugin{}

	_ framework.FilterPlugin  = &Plugin{}
	_ framework.ScorePlugin   = &Plugin{}
	_ framework.ReservePlugin = &Plugin{}
)

Filter 拓展插件如下所示:

func (p *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    node := nodeInfo.Node()
    if node == nil {
        return framework.NewStatus(framework.Error, "node not found")
    }
    
    // 如果是 DaemonSet,则不参与过滤 
    if isDaemonSetPod(pod.OwnerReferences) {
        return nil
    }

    // 获取 node 指标数据
    nodeMetric, err := p.nodeMetricLister.Get(node.Name)
    if err != nil {
        // For nodes that lack load information, fall back to the situation where there is no load-aware scheduling.
        // Some nodes in the cluster do not install the koordlet, but users newly created Pod use koord-scheduler to schedule,
        // and the load-aware scheduling itself is an optimization, so we should skip these nodes.
        if errors.IsNotFound(err) {
            return nil
        }
        return framework.NewStatus(framework.Error, err.Error())
    }

    // 获取不到 Node 指标数据,则不参与过滤 
    if p.args.FilterExpiredNodeMetrics != nil && *p.args.FilterExpiredNodeMetrics &&
        p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) {
        return nil
    }

    filterProfile := generateUsageThresholdsFilterProfile(node, p.args)
    if len(filterProfile.ProdUsageThresholds) > 0 && extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd {
        // 过滤生产环境 Pod 资源使用率 
        status := p.filterProdUsage(node, nodeMetric, filterProfile.ProdUsageThresholds)
        if !status.IsSuccess() {
            return status
        }
    } else {
        var usageThresholds map[corev1.ResourceName]int64
        if filterProfile.AggregatedUsage != nil {
            usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
        } else {
            usageThresholds = filterProfile.UsageThresholds
        }
        if len(usageThresholds) > 0 {
            // 
            status := p.filterNodeUsage(node, nodeMetric, filterProfile)
            if !status.IsSuccess() {
                return status
            }
        }
    }

    return nil
}

func (p *Plugin) filterProdUsage(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, prodUsageThresholds map[corev1.ResourceName]int64) *framework.Status {
	if len(nodeMetric.Status.PodsMetric) == 0 {
		return nil
	}

	// TODO(joseph): maybe we should estimate the Pod that just be scheduled that have not reported
	podMetrics := buildPodMetricMap(p.podLister, nodeMetric, true)
	prodPodUsages, _ := sumPodUsages(podMetrics, nil)
    // 生产环境的 Pod 资源(CPU、Memory等)利用率
	for resourceName, threshold := range prodUsageThresholds {
		if threshold == 0 {
			continue
		}
        // 获取 Node 节点已分配的资源
		allocatable, err := p.estimator.EstimateNode(node)
		if err != nil {
			klog.ErrorS(err, "Failed to EstimateNode", "node", node.Name)
			return nil
		}
		total := allocatable[resourceName]
		if total.IsZero() {
			continue
		}
		used := prodPodUsages[resourceName]
        // 资源利用率 
		usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100))
		if usage >= threshold {
            // 超过阈值的 Pod 不可调度
			return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, resourceName))
		}
	}
	return nil
}

func (p *Plugin) filterNodeUsage(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, filterProfile *usageThresholdsFilterProfile) *framework.Status {
	if nodeMetric.Status.NodeMetric == nil {
		return nil
	}

	var usageThresholds map[corev1.ResourceName]int64
	if filterProfile.AggregatedUsage != nil {
		usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
	} else {
		usageThresholds = filterProfile.UsageThresholds
	}

	for resourceName, threshold := range usageThresholds {
		if threshold == 0 {
			continue
		}
		allocatable, err := p.estimator.EstimateNode(node)
		if err != nil {
			klog.ErrorS(err, "Failed to EstimateNode", "node", node.Name)
			return nil
		}
		total := allocatable[resourceName]
		if total.IsZero() {
			continue
		}
		// TODO(joseph): maybe we should estimate the Pod that just be scheduled that have not reported
		var nodeUsage *slov1alpha1.ResourceMap
		if filterProfile.AggregatedUsage != nil {
			nodeUsage = getTargetAggregatedUsage(
				nodeMetric,
				filterProfile.AggregatedUsage.UsageAggregatedDuration,
				filterProfile.AggregatedUsage.UsageAggregationType,
			)
		} else {
			nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
		}
		if nodeUsage == nil {
			continue
		}

		used := nodeUsage.ResourceList[resourceName]
		usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100))
        // 过滤 Node 节点资源(CPU、Memory等)利用率超过阈值的节点 
		if usage >= threshold {
			reason := ErrReasonUsageExceedThreshold
			if filterProfile.AggregatedUsage != nil {
				reason = ErrReasonAggregatedUsageExceedThreshold
			}
			return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(reason, resourceName))
		}
	}
	return nil
}

func buildPodMetricMap(podLister corev1listers.PodLister, nodeMetric *slov1alpha1.NodeMetric, filterProdPod bool) map[string]corev1.ResourceList {
	if len(nodeMetric.Status.PodsMetric) == 0 {
		return nil
	}
	podMetrics := make(map[string]corev1.ResourceList)
    // 从 nodeMetric.Status.PodsMetric 获取资源使用率 
    // 其中 nodeMetric 指标数据是从 Koordlet Daemonset 中采集的 
	for _, podMetric := range nodeMetric.Status.PodsMetric {
		pod, err := podLister.Pods(podMetric.Namespace).Get(podMetric.Name)
		if err != nil {
			continue
		}
		if filterProdPod && extension.GetPodPriorityClassWithDefault(pod) != extension.PriorityProd {
			continue
		}
		name := getPodNamespacedName(podMetric.Namespace, podMetric.Name)
		podMetrics[name] = podMetric.PodUsage.ResourceList
	}
	return podMetrics
}

func sumPodUsages(podMetrics map[string]corev1.ResourceList, estimatedPods sets.String) (podUsages, estimatedPodsUsages corev1.ResourceList) {
	if len(podMetrics) == 0 {
		return nil, nil
	}
	podUsages = make(corev1.ResourceList)
	estimatedPodsUsages = make(corev1.ResourceList)
	for podName, usage := range podMetrics {
		if estimatedPods.Has(podName) {
			util.AddResourceList(estimatedPodsUsages, usage)
			continue
		}
		util.AddResourceList(podUsages, usage)
	}
	return podUsages, estimatedPodsUsages
}

Filter 插件的主要功能是通过资源阈值过滤 Node 与 Pod。

  • Node CPU 与内存资源利用率是否超过阈值;
  • Pod CPU 与内存资源利用率是否超过阈值;

Score 拓展插件如下所示:

func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (int64, *framework.Status) {
    // 获取 Node 节点信息
	nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	if err != nil {
		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
	}
	node := nodeInfo.Node()
	if node == nil {
		return 0, framework.NewStatus(framework.Error, "node not found")
	}
    // 获取 Node 指标数据
	nodeMetric, err := p.nodeMetricLister.Get(nodeName)
	if err != nil {
		// caused by load-aware scheduling itself is an optimization,
		// so we should skip the node and score the node 0
		if errors.IsNotFound(err) {
			return 0, nil
		}
		return 0, framework.NewStatus(framework.Error, err.Error())
	}
    // Node 数据获取失败,则不参与打分机制
	if p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds)          {
		return 0, nil
	}

    // 生产环境 Pod 优先级
	prodPod := extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd && p.args.ScoreAccordingProdUsage
	podMetrics := buildPodMetricMap(p.podLister, nodeMetric, prodPod)

    // 估计 Pod 资源
	estimatedUsed, err := p.estimator.EstimatePod(pod)
	if err != nil {
		return 0, nil
	}
	assignedPodEstimatedUsed, estimatedPods := p.estimatedAssignedPodUsed(nodeName, nodeMetric, podMetrics, prodPod)
	for resourceName, value := range assignedPodEstimatedUsed {
		estimatedUsed[resourceName] += value
	}
    // Pod 资源实际使用率 
	podActualUsages, estimatedPodActualUsages := sumPodUsages(podMetrics, estimatedPods)
	if prodPod {
		for resourceName, quantity := range podActualUsages {
			estimatedUsed[resourceName] += getResourceValue(resourceName, quantity)
		}
	} else {
		if nodeMetric.Status.NodeMetric != nil {
			var nodeUsage *slov1alpha1.ResourceMap
			if scoreWithAggregation(p.args.Aggregated) {
				nodeUsage = getTargetAggregatedUsage(nodeMetric, &p.args.Aggregated.ScoreAggregatedDuration, p.args.Aggregated.ScoreAggregationType)
			} else {
				nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
			}
			if nodeUsage != nil {
				for resourceName, quantity := range nodeUsage.ResourceList {
					if q := estimatedPodActualUsages[resourceName]; !q.IsZero() {
						quantity = quantity.DeepCopy()
						if quantity.Cmp(q) >= 0 {
							quantity.Sub(q)
						}
					}
					estimatedUsed[resourceName] += getResourceValue(resourceName, quantity)
				}
			}
		}
	}
    
    // 可分配资源
	allocatable, err := p.estimator.EstimateNode(node)
	if err != nil {
		return 0, nil
	}
    // 进行调度打分
    // ResourceWeights indicates the weights of resources.
	// The weights of resources are both 1 by default.
	// ResourceWeights map[corev1.ResourceName]int64
    
    // p.args.ResourceWeights 是来源于 SetDefaults_LoadAwareSchedulingArgs 函数初始化中的 defaultResourceWeights
    // 就是 CPU、Memory 资源
	score := loadAwareSchedulingScorer(p.args.ResourceWeights, estimatedUsed, allocatable)
	return score, nil
}

// SetDefaults_LoadAwareSchedulingArgs sets the default parameters for LoadAwareScheduling plugin.
func SetDefaults_LoadAwareSchedulingArgs(obj *LoadAwareSchedulingArgs) {
	if obj.FilterExpiredNodeMetrics == nil {
		obj.FilterExpiredNodeMetrics = pointer.Bool(true)
	}
	if obj.NodeMetricExpirationSeconds == nil {
		obj.NodeMetricExpirationSeconds = pointer.Int64(defaultNodeMetricExpirationSeconds)
	}
	if len(obj.ResourceWeights) == 0 {
		obj.ResourceWeights = defaultResourceWeights
	}
	if len(obj.UsageThresholds) == 0 {
		obj.UsageThresholds = defaultUsageThresholds
	}
	if obj.EstimatedScalingFactors == nil {
		obj.EstimatedScalingFactors = defaultEstimatedScalingFactors
	} else {
		for k, v := range defaultEstimatedScalingFactors {
			if _, ok := obj.EstimatedScalingFactors[k]; !ok {
				obj.EstimatedScalingFactors[k] = v
			}
		}
	}
}

var (
    defaultNodeMetricExpirationSeconds int64 = 180

	defaultResourceWeights = map[corev1.ResourceName]int64{
		corev1.ResourceCPU:    1,
		corev1.ResourceMemory: 1,
	}

	defaultUsageThresholds = map[corev1.ResourceName]int64{
		corev1.ResourceCPU:    65, // 65%
		corev1.ResourceMemory: 95, // 95%
	}

	defaultEstimatedScalingFactors = map[corev1.ResourceName]int64{
		corev1.ResourceCPU:    85, // 85%
		corev1.ResourceMemory: 70, // 70%
	}
)

// 通过 weight 权重均衡 Pod 得分
func loadAwareSchedulingScorer(resToWeightMap, used map[corev1.ResourceName]int64, allocatable corev1.ResourceList) int64 {
	var nodeScore, weightSum int64
	for resourceName, weight := range resToWeightMap {
		resourceScore := leastRequestedScore(used[resourceName], getResourceValue(resourceName, allocatable[resourceName]))
		nodeScore += resourceScore * weight
		weightSum += weight
	}
	return nodeScore / weightSum
}

// 最低得分
func leastRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		return 0
	}

	return ((capacity - requested) * framework.MaxNodeScore) / capacity
}

Score 插件的主要功能是通过CPU、Memory资源使用率进行打分。

Reserve 与 Unreserve 拓展插件如下所示:

func (p *Plugin) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
	p.podAssignCache.assign(nodeName, pod)
	return nil
}

func (p *podAssignCache) assign(nodeName string, pod *corev1.Pod) {
	if nodeName == "" || util.IsPodTerminated(pod) {
		return
	}
	p.lock.Lock()
	defer p.lock.Unlock()
	m := p.podInfoItems[nodeName]
	if m == nil {
		m = make(map[types.UID]*podAssignInfo)
		p.podInfoItems[nodeName] = m
	}
	m[pod.UID] = &podAssignInfo{
		timestamp: timeNowFn(),
		pod:       pod,
	}
}

func IsPodTerminated(pod *corev1.Pod) bool {
	return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed
}

func (p *Plugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) {
	p.podAssignCache.unAssign(nodeName, pod)
}

func (p *podAssignCache) unAssign(nodeName string, pod *corev1.Pod) {
	if nodeName == "" {
		return
	}
	p.lock.Lock()
	defer p.lock.Unlock()
	delete(p.podInfoItems[nodeName], pod.UID)
	if len(p.podInfoItems[nodeName]) == 0 {
		delete(p.podInfoItems, nodeName)
	}
}


配置

apiVersion: v1
kind: ConfigMap
metadata:
  name: koord-scheduler-config
  ...
data:
  koord-scheduler-config: |
    apiVersion: kubescheduler.config.k8s.io/v1beta2
    kind: KubeSchedulerConfiguration
    profiles:
      - schedulerName: koord-scheduler
        plugins:
          # 启用LoadAwareScheduling插件
          filter:
            enabled:
              - name: LoadAwareScheduling
              ...
          score:
            enabled:
              - name: LoadAwareScheduling
                weight: 1
              ...
          reserve:
            enabled:
              - name: LoadAwareScheduling
          ...
        pluginConfig:
        # 配置插件的阈值和权重
        - name: LoadAwareScheduling
          args:
            apiVersion: kubescheduler.config.k8s.io/v1beta2
            kind: LoadAwareSchedulingArgs
            # 是否过滤koordlet无法更新NodeMetric的节点
            filterExpiredNodeMetrics: true
            # 使用NodeMetric时的到期阈值单位秒
            nodeMetricExpirationSeconds: 300
            # 资源权重
            resourceWeights:
              cpu: 1
              memory: 1
            # 资源利用率的阈值(%)
            usageThresholds:
              cpu: 75
              memory: 85
            # Prod Pods资源利用率阈值(%)
            prodUsageThresholds:
              cpu: 55
              memory: 65
            # 根据Prod的使用情况启用分数
            scoreAccordingProdUsage: true
            # 估计资源使用情况的因素(%)
            estimatedScalingFactors:
              cpu: 80
              memory: 70
            # 根据百分位数统计实现资源利用率过滤和评分
            aggregated:
              usageThresholds:
                cpu: 65
                memory: 75
              usageAggregationType: "p99"
              scoreAggregationType: "p99"
字段说明
filterExpiredNodeMetricsfilterExpiredNodeMetrics 表示是否过滤koordlet更新NodeMetric失败的节点。 默认情况下启用,但在 Helm chart 中,它被禁用。
nodeMetricExpirationSecondsnodeMetricExpirationSeconds 指示 NodeMetric 过期时间,当 NodeMetrics 过期时,节点被认为是异常的。 默认为 180 秒。
resourceWeightsresourceWeights 表示资源的权重。 CPU 和 Memory 的权重默认都是 1。
usageThresholdsusageThresholds 表示整机的资源利用率阈值。 CPU 的默认值为 65%,内存的默认值为 95%。
estimatedScalingFactorsestimatedScalingFactors 表示估计资源使用时的因子。 CPU 默认值为 85%,Memory 默认值为 70%。
prodUsageThresholdsprodUsageThresholds 表示 Prod Pod 相对于整机的资源利用率阈值。 默认情况下不启用。
scoreAccordingProdUsagescoreAccordingProdUsage 控制是否根据 Prod Pod 的利用率进行评分。
aggregatedaggregated 支持基于百分位数统计的资源利用率过滤和评分。

Aggregated 支持的字段:

字段说明
usageThresholdsusageThresholds 表示机器基于百分位统计的资源利用率阈值。
usageAggregationTypeusageAggregationType 表示过滤时机器利用率的百分位类型。 目前支持avgp50p90p95p99
usageAggregatedDurationusageAggregatedDuration 表示过滤时机器利用率百分位数的统计周期。不设置该字段时,调度器默认使用 NodeMetrics 中最大周期的数据。
scoreAggregationTypescoreAggregationType 表示评分时机器利用率的百分位类型。 目前支持avgp50p90p95p99
scoreAggregatedDurationscoreAggregatedDuration 表示打分时 Prod Pod 利用率百分位的统计周期。 不设置该字段时,调度器默认使用 NodeMetrics 中最大周期的数据。

通过插件的配置可以作为集群默认的全局配置,用户也可以通过在节点上附加 annotation 来设置节点维度的负载阈值。

当节点上存在 annotation 时,会根据注解指定的参数进行过滤,Annotation 定义如下:

const (
  AnnotationCustomUsageThresholds = "scheduling.koordinator.sh/usage-thresholds"
)

//CustomUsageThresholds支持用户定义的节点资源利用阈值。
type CustomUsageThresholds struct {
    // 使用阈值表示整个机器的资源利用率阈值。
    UsageThresholds map[corev1.ResourceName]int64 `json:"usageThresholds,omitempty"`
    // ProdUsageThresholds表示与整台机器相比,Prod Pods的资源利用率阈值
    ProdUsageThresholds map[corev1.ResourceName]int64 `json:"prodUsageThresholds,omitempty"`
    // AggregatedUsage支持基于百分位数统计的资源利用率过滤和评分
    AggregatedUsage *CustomAggregatedUsage `json:"aggregatedUsage,omitempty"`
}

type CustomAggregatedUsage struct {
    // 使用阈值表示基于百分位数统计的机器资源利用阈值
    UsageThresholds map[corev1.ResourceName]int64 `json:"usageThresholds,omitempty"`
    // UsageAggregationType表示过滤时机器利用率的百分位数类型
    UsageAggregationType slov1alpha1.AggregationType `json:"usageAggregationType,omitempty"`
    // UsageAggregatedDuration表示过滤时机器利用率的百分位数的统计周期
    UsageAggregatedDuration *metav1.Duration `json:"usageAggregatedDuration,omitempty"`
}



0

评论区