侧边栏壁纸
博主头像
lance

不为失败找借口,只为成功找方法。

  • 累计撰写 28 篇文章
  • 累计创建 0 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Kubernetes 核心调度算法.md

lance
2025-03-19 / 0 评论 / 0 点赞 / 60 阅读 / 12,005 字
温馨提示:
本文最后更新于 2025-05-04,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

我们先查看下 kube-scheduler-192.168.0.198 Pod 中的启动日志,default-scheduler 默认调度器使用的插件如下所示:

➜  kubernetes git:(v1.26.9-scheduler) kubectl -n kube-system logs -f kube-scheduler-192.168.0.198
W0412 10:30:45.766242       1 feature_gate.go:241] Setting GA feature gate MixedProtocolLBService=true. It will be removed in a future release.
I0412 10:30:45.766367       1 flags.go:64] FLAG: --allow-metric-labels="[]"
I0412 10:30:45.766393       1 flags.go:64] FLAG: --authentication-kubeconfig=""
I0412 10:30:45.766398       1 flags.go:64] FLAG: --authentication-skip-lookup="false"
I0412 10:30:45.766402       1 flags.go:64] FLAG: --authentication-token-webhook-cache-ttl="10s"
I0412 10:30:45.766407       1 flags.go:64] FLAG: --authentication-tolerate-lookup-failure="true"
I0412 10:30:45.766410       1 flags.go:64] FLAG: --authorization-always-allow-paths="[/metrics,/healthz,/readyz,/livez]"
I0412 10:30:45.766418       1 flags.go:64] FLAG: --authorization-kubeconfig=""
I0412 10:30:45.766421       1 flags.go:64] FLAG: --authorization-webhook-cache-authorized-ttl="10s"
I0412 10:30:45.766425       1 flags.go:64] FLAG: --authorization-webhook-cache-unauthorized-ttl="10s"
I0412 10:30:45.766428       1 flags.go:64] FLAG: --bind-address="0.0.0.0"
I0412 10:30:45.766432       1 flags.go:64] FLAG: --cert-dir=""
I0412 10:30:45.766435       1 flags.go:64] FLAG: --client-ca-file=""
I0412 10:30:45.766438       1 flags.go:64] FLAG: --config=""
I0412 10:30:45.766440       1 flags.go:64] FLAG: --contention-profiling="true"
I0412 10:30:45.766444       1 flags.go:64] FLAG: --disabled-metrics="[]"
I0412 10:30:45.766448       1 flags.go:64] FLAG: --feature-gates="MixedProtocolLBService=true"
I0412 10:30:45.766461       1 flags.go:64] FLAG: --help="false"
I0412 10:30:45.766464       1 flags.go:64] FLAG: --http2-max-streams-per-connection="0"
I0412 10:30:45.766471       1 flags.go:64] FLAG: --kube-api-burst="100"
I0412 10:30:45.766476       1 flags.go:64] FLAG: --kube-api-content-type="application/vnd.kubernetes.protobuf"
I0412 10:30:45.766480       1 flags.go:64] FLAG: --kube-api-qps="100"
I0412 10:30:45.766484       1 flags.go:64] FLAG: --kubeconfig="/etc/kubernetes/scheduler.conf"
I0412 10:30:45.766488       1 flags.go:64] FLAG: --leader-elect="true"
I0412 10:30:45.766490       1 flags.go:64] FLAG: --leader-elect-lease-duration="15s"
I0412 10:30:45.766495       1 flags.go:64] FLAG: --leader-elect-renew-deadline="10s"
I0412 10:30:45.766497       1 flags.go:64] FLAG: --leader-elect-resource-lock="leases"
I0412 10:30:45.766501       1 flags.go:64] FLAG: --leader-elect-resource-name="kube-scheduler"
I0412 10:30:45.766503       1 flags.go:64] FLAG: --leader-elect-resource-namespace="kube-system"
I0412 10:30:45.766506       1 flags.go:64] FLAG: --leader-elect-retry-period="2s"
I0412 10:30:45.766509       1 flags.go:64] FLAG: --lock-object-name="kube-scheduler"
I0412 10:30:45.766512       1 flags.go:64] FLAG: --lock-object-namespace="kube-system"
I0412 10:30:45.766515       1 flags.go:64] FLAG: --log-flush-frequency="5s"
I0412 10:30:45.766518       1 flags.go:64] FLAG: --log-json-info-buffer-size="0"
I0412 10:30:45.766528       1 flags.go:64] FLAG: --log-json-split-stream="false"
I0412 10:30:45.766531       1 flags.go:64] FLAG: --logging-format="text"
I0412 10:30:45.766534       1 flags.go:64] FLAG: --master="https://192.168.0.198:6443"
I0412 10:30:45.766537       1 flags.go:64] FLAG: --permit-address-sharing="false"
I0412 10:30:45.766540       1 flags.go:64] FLAG: --permit-port-sharing="false"
I0412 10:30:45.766543       1 flags.go:64] FLAG: --pod-max-in-unschedulable-pods-duration="5m0s"
I0412 10:30:45.766546       1 flags.go:64] FLAG: --profiling="true"
I0412 10:30:45.766549       1 flags.go:64] FLAG: --requestheader-allowed-names="[]"
I0412 10:30:45.766554       1 flags.go:64] FLAG: --requestheader-client-ca-file=""
I0412 10:30:45.766557       1 flags.go:64] FLAG: --requestheader-extra-headers-prefix="[x-remote-extra-]"
I0412 10:30:45.766561       1 flags.go:64] FLAG: --requestheader-group-headers="[x-remote-group]"
I0412 10:30:45.766566       1 flags.go:64] FLAG: --requestheader-username-headers="[x-remote-user]"
I0412 10:30:45.766570       1 flags.go:64] FLAG: --secure-port="10259"
I0412 10:30:45.766573       1 flags.go:64] FLAG: --show-hidden-metrics-for-version=""
I0412 10:30:45.766577       1 flags.go:64] FLAG: --tls-cert-file=""
I0412 10:30:45.766580       1 flags.go:64] FLAG: --tls-cipher-suites="[]"
I0412 10:30:45.766585       1 flags.go:64] FLAG: --tls-min-version=""
I0412 10:30:45.766587       1 flags.go:64] FLAG: --tls-private-key-file=""
I0412 10:30:45.766590       1 flags.go:64] FLAG: --tls-sni-cert-key="[]"
I0412 10:30:45.766595       1 flags.go:64] FLAG: --v="2"
I0412 10:30:45.766600       1 flags.go:64] FLAG: --version="false"
I0412 10:30:45.766606       1 flags.go:64] FLAG: --vmodule=""
I0412 10:30:45.766610       1 flags.go:64] FLAG: --write-config-to=""
I0412 10:30:47.633062       1 serving.go:348] Generated self-signed cert in-memory
W0412 10:30:48.875048       1 authentication.go:339] No authentication-kubeconfig provided in order to lookup client-ca-file in configmap/extension-apiserver-authentication in kube-system, so client certificate authentication won't work.
W0412 10:30:48.891788       1 authentication.go:363] No authentication-kubeconfig provided in order to lookup requestheader-client-ca-file in configmap/extension-apiserver-authentication in kube-system, so request-header client certificate authentication won't work.
W0412 10:30:48.891815       1 authorization.go:194] No authorization-kubeconfig provided, so SubjectAccessReview of authorization tokens won't work.
I0412 10:30:57.369970       1 configfile.go:105] "Using component config" config=<
	apiVersion: kubescheduler.config.k8s.io/v1
	clientConnection:
	  acceptContentTypes: ""
	  burst: 100
	  contentType: application/vnd.kubernetes.protobuf
	  kubeconfig: /etc/kubernetes/scheduler.conf
	  qps: 100
	enableContentionProfiling: true
	enableProfiling: true
	kind: KubeSchedulerConfiguration
	leaderElection:
	  leaderElect: true
	  leaseDuration: 15s
	  renewDeadline: 10s
	  resourceLock: leases
	  resourceName: kube-scheduler
	  resourceNamespace: kube-system
	  retryPeriod: 2s
	parallelism: 16
	percentageOfNodesToScore: 0
	podInitialBackoffSeconds: 1
	podMaxBackoffSeconds: 10
	profiles:
	- pluginConfig:
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      kind: DefaultPreemptionArgs
	      minCandidateNodesAbsolute: 100
	      minCandidateNodesPercentage: 10
	    name: DefaultPreemption
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      hardPodAffinityWeight: 1
	      kind: InterPodAffinityArgs
	    name: InterPodAffinity
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      kind: NodeAffinityArgs
	    name: NodeAffinity
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      kind: NodeResourcesBalancedAllocationArgs
	      resources:
	      - name: cpu
	        weight: 1
	      - name: memory
	        weight: 1
	    name: NodeResourcesBalancedAllocation
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      kind: NodeResourcesFitArgs
	      scoringStrategy:
	        resources:
	        - name: cpu
	          weight: 1
	        - name: memory
	          weight: 1
	        type: LeastAllocated
	    name: NodeResourcesFit
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      defaultingType: System
	      kind: PodTopologySpreadArgs
	    name: PodTopologySpread
	  - args:
	      apiVersion: kubescheduler.config.k8s.io/v1
	      bindTimeoutSeconds: 600
	      kind: VolumeBindingArgs
	    name: VolumeBinding
	  plugins:
	    bind: {}
	    filter: {}
	    multiPoint:
	      enabled:
	      - name: PrioritySort
	        weight: 0
	      - name: NodeUnschedulable
	        weight: 0
	      - name: NodeName
	        weight: 0
	      - name: TaintToleration
	        weight: 3
	      - name: NodeAffinity
	        weight: 2
	      - name: NodePorts
	        weight: 0
	      - name: NodeResourcesFit
	        weight: 1
	      - name: VolumeRestrictions
	        weight: 0
	      - name: EBSLimits
	        weight: 0
	      - name: GCEPDLimits
	        weight: 0
	      - name: NodeVolumeLimits
	        weight: 0
	      - name: AzureDiskLimits
	        weight: 0
	      - name: VolumeBinding
	        weight: 0
	      - name: VolumeZone
	        weight: 0
	      - name: PodTopologySpread
	        weight: 2
	      - name: InterPodAffinity
	        weight: 2
	      - name: DefaultPreemption
	        weight: 0
	      - name: NodeResourcesBalancedAllocation
	        weight: 1
	      - name: ImageLocality
	        weight: 1
	      - name: DefaultBinder
	        weight: 0
	    permit: {}
	    postBind: {}
	    postFilter: {}
	    preBind: {}
	    preEnqueue: {}
	    preFilter: {}
	    preScore: {}
	    queueSort: {}
	    reserve: {}
	    score: {}
	  schedulerName: default-scheduler
 >
I0412 10:30:57.370358       1 server.go:152] "Starting Kubernetes Scheduler" version="v1.26.9"
I0412 10:30:57.370388       1 server.go:154] "Golang settings" GOGC="" GOMAXPROCS="" GOTRACEBACK=""

img

之前分析过 in-tree 内置算法与 out-of-tree 拓展算法,接下来举几个核心示例。核心内置调度算法可参见 https://github.com/kubernetes/kubernetes/tree/master/pkg/scheduler/framework/plugins

in-tree

noderesources

在 pkg/scheduler/framework/plugins/noderesources.go 文件下存在以下资源调度策略,如下所示:

// scorer is decorator for resourceAllocationScorer
// 资源分配打分装饰器 
type scorer func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer

// resourceAllocationScorer contains information to calculate resource allocation score.
type resourceAllocationScorer struct {
	Name string
	// used to decide whether to use Requested or NonZeroRequested for
	// cpu and memory.
    // 根据 cpu 与内存判断使用 Requested 或者 NonZeroRequested 
	useRequested bool
	scorer       func(requested, allocable []int64) int64
	resources    []config.ResourceSpec
}

// score will use `scorer` function to calculate the score.
func (r *resourceAllocationScorer) score(
	ctx context.Context,
	pod *v1.Pod,
	nodeInfo *framework.NodeInfo,
	podRequests []int64) (int64, *framework.Status) {
	logger := klog.FromContext(ctx)
	node := nodeInfo.Node()

	// resources not set, nothing scheduled,
    // 没有配置资源,配置 0 得分
	if len(r.resources) == 0 {
		return 0, framework.NewStatus(framework.Error, "resources not found")
	}

	requested := make([]int64, len(r.resources))
	allocatable := make([]int64, len(r.resources))
	for i := range r.resources {
		alloc, req := r.calculateResourceAllocatableRequest(logger, nodeInfo, v1.ResourceName(r.resources[i].Name), podRequests[i])
		// Only fill the extended resource entry when it's non-zero.
		if alloc == 0 {
			continue
		}
		allocatable[i] = alloc
		requested[i] = req
	}

	score := r.scorer(requested, allocatable)

	if loggerV := logger.V(10); loggerV.Enabled() { // Serializing these maps is costly.
		loggerV.Info("Listed internal info for allocatable resources, requested resources and score", "pod",
			klog.KObj(pod), "node", klog.KObj(node), "resourceAllocationScorer", r.Name,
			"allocatableResource", allocatable, "requestedResource", requested, "resourceScore", score,
		)
	}

	return score, nil
}

// calculateResourceAllocatableRequest returns 2 parameters:
// - 1st param: quantity of allocatable resource on the node.
// - 2nd param: aggregated quantity of requested resource on the node.
// Note: if it's an extended resource, and the pod doesn't request it, (0, 0) is returned.
// 第一个返回值:节点上可分配资源的数量
// 第二个返回值:节点上请求资源的聚合数量
func (r *resourceAllocationScorer) calculateResourceAllocatableRequest(logger klog.Logger, nodeInfo *framework.NodeInfo, resource v1.ResourceName, podRequest int64) (int64, int64) {
    // 节点可请求资源
	requested := nodeInfo.NonZeroRequested
	if r.useRequested {
		requested = nodeInfo.Requested
	}

	// If it's an extended resource, and the pod doesn't request it. We return (0, 0)
	// as an implication to bypass scoring on this resource.
	if podRequest == 0 && schedutil.IsScalarResourceName(resource) {
		return 0, 0
	}
	switch resource {
    // CPU
	case v1.ResourceCPU:
		return nodeInfo.Allocatable.MilliCPU, (requested.MilliCPU + podRequest)
    // Memory 内存
	case v1.ResourceMemory:
		return nodeInfo.Allocatable.Memory, (requested.Memory + podRequest)
    // ResourceEphemeralStorage 存储资源
	case v1.ResourceEphemeralStorage:
		return nodeInfo.Allocatable.EphemeralStorage, (nodeInfo.Requested.EphemeralStorage + podRequest)
	default:
    // 默认情况 
		if _, exists := nodeInfo.Allocatable.ScalarResources[resource]; exists {
			return nodeInfo.Allocatable.ScalarResources[resource], (nodeInfo.Requested.ScalarResources[resource] + podRequest)
		}
	}
	logger.V(10).Info("Requested resource is omitted for node score calculation", "resourceName", resource)
	return 0, 0
}

// calculatePodResourceRequest returns the total non-zero requests. If Overhead is defined for the pod
// the Overhead is added to the result.
func (r *resourceAllocationScorer) calculatePodResourceRequest(pod *v1.Pod, resourceName v1.ResourceName) int64 {
    // owner: @vinaykul
    // kep: http://kep.k8s.io/1287
    // alpha: v1.27
    // Enables In-Place Pod Vertical Scaling
    // InPlacePodVerticalScaling featuregate.Feature = "InPlacePodVerticalScaling"
    // 是否开启 Pod 

	opts := resourcehelper.PodResourcesOptions{
		InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
	}
    
    // const (
    //	// DefaultMilliCPURequest defines default milli cpu request number.
    //	DefaultMilliCPURequest int64 = 100 // 0.1 core
    //	// DefaultMemoryRequest defines default memory request size.
    //	DefaultMemoryRequest int64 = 200 * 1024 * 1024 // 200 MB
    //)

    // Pod 没有配置资源请求,默认给 0.1 core、200MB
	if !r.useRequested {
		opts.NonMissingContainerRequests = v1.ResourceList{
			v1.ResourceCPU:    *resource.NewMilliQuantity(schedutil.DefaultMilliCPURequest, resource.DecimalSI),
			v1.ResourceMemory: *resource.NewQuantity(schedutil.DefaultMemoryRequest, resource.DecimalSI),
		}
	}

	requests := resourcehelper.PodRequests(pod, opts)

	quantity := requests[resourceName]
	if resourceName == v1.ResourceCPU {
		return quantity.MilliValue()
	}
	return quantity.Value()
}

func (r *resourceAllocationScorer) calculatePodResourceRequestList(pod *v1.Pod, resources []config.ResourceSpec) []int64 {
	podRequests := make([]int64, len(resources))
	for i := range resources {
		podRequests[i] = r.calculatePodResourceRequest(pod, v1.ResourceName(resources[i].Name))
	}
	return podRequests
}

注册内置插件,如下所示:

registry := runtime.Registry{
	...
	noderesources.Name:                   runtime.FactoryAdapter(fts, noderesources.NewFit),
	noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
	...
}

pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go 表示资源(CPU与内存)使用是否均衡(标准差),如下所示:

// BalancedAllocation is a score plugin that calculates the difference between the cpu and memory fraction
// of capacity, and prioritizes the host based on how close the two metrics are to each other.
type BalancedAllocation struct {
	handle framework.Handle
	resourceAllocationScorer
}

var _ framework.PreScorePlugin = &BalancedAllocation{}
var _ framework.ScorePlugin = &BalancedAllocation{}

// BalancedAllocationName is the name of the plugin used in the plugin registry and configurations.
const (
	BalancedAllocationName = names.NodeResourcesBalancedAllocation

	// balancedAllocationPreScoreStateKey is the key in CycleState to NodeResourcesBalancedAllocation pre-computed data for Scoring.
	balancedAllocationPreScoreStateKey = "PreScore" + BalancedAllocationName
)

// balancedAllocationPreScoreState computed at PreScore and used at Score.
// balancedAllocationPreScoreState 在 PreScore 与 Score 打分生效
type balancedAllocationPreScoreState struct {
	// podRequests have the same order of the resources defined in NodeResourcesFitArgs.Resources,
	// same for other place we store a list like that.
	podRequests []int64
}

// Clone implements the mandatory Clone interface. We don't really copy the data since
// there is no need for that.
func (s *balancedAllocationPreScoreState) Clone() framework.StateData {
	return s
}

// PreScore calculates incoming pod's resource requests and writes them to the cycle state used.
// PreScore 计算传入 pod 的资源请求并将其写入到使用的周期状态中
func (ba *BalancedAllocation) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
	state := &balancedAllocationPreScoreState{
		podRequests: ba.calculatePodResourceRequestList(pod, ba.resources),
	}
	cycleState.Write(balancedAllocationPreScoreStateKey, state)
	return nil
}

// getBalancedAllocationPreScoreState 获取负载均衡打分策略
func getBalancedAllocationPreScoreState(cycleState *framework.CycleState) (*balancedAllocationPreScoreState, error) {
	c, err := cycleState.Read(balancedAllocationPreScoreStateKey)
	if err != nil {
		return nil, fmt.Errorf("reading %q from cycleState: %w", balancedAllocationPreScoreStateKey, err)
	}

	s, ok := c.(*balancedAllocationPreScoreState)
	if !ok {
		return nil, fmt.Errorf("invalid PreScore state, got type %T", c)
	}
	return s, nil
}

// Name returns name of the plugin. It is used in logs, etc.
func (ba *BalancedAllocation) Name() string {
	return BalancedAllocationName
}

// Score invoked at the score extension point.
// Score 拓展点
func (ba *BalancedAllocation) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := ba.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	if err != nil {
		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
	}

	s, err := getBalancedAllocationPreScoreState(state)
	if err != nil {
		s = &balancedAllocationPreScoreState{podRequests: ba.calculatePodResourceRequestList(pod, ba.resources)}
	}

	// ba.score favors nodes with balanced resource usage rate.
	// It calculates the standard deviation for those resources and prioritizes the node based on how close the usage of those resources is to each other.
	// Detail: score = (1 - std) * MaxNodeScore, where std is calculated by the root square of Σ((fraction(i)-mean)^2)/len(resources)
	// The algorithm is partly inspired by:
	// "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization"
	return ba.score(ctx, pod, nodeInfo, s.podRequests)
}

// ScoreExtensions of the Score plugin.
func (ba *BalancedAllocation) ScoreExtensions() framework.ScoreExtensions {
	return nil
}

// NewBalancedAllocation initializes a new plugin and returns it.
// NewBalancedAllocation 初始化 
func NewBalancedAllocation(_ context.Context, baArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
	args, ok := baArgs.(*config.NodeResourcesBalancedAllocationArgs)
	if !ok {
		return nil, fmt.Errorf("want args to be of type NodeResourcesBalancedAllocationArgs, got %T", baArgs)
	}

	if err := validation.ValidateNodeResourcesBalancedAllocationArgs(nil, args); err != nil {
		return nil, err
	}

	return &BalancedAllocation{
		handle: h,
		resourceAllocationScorer: resourceAllocationScorer{
			Name:         BalancedAllocationName,
            // balancedResourceScorer 表示资源得分机制 
			scorer:       balancedResourceScorer,
			useRequested: true,
			resources:    args.Resources,
		},
	}, nil
}

// balancedResourceScorer 计算每种资源请求的比例(请求/可分配),然后计算了这些比例的标准差。
// 标准差衡量了请求资源的比例与平均值之间的偏差程度,如果标准差越小,说明节点上各资源类型的利用率更均衡,这种节点更适合调度。
// 最后,它返回一个分数,分数越高表示节点的资源分配越均衡。
func balancedResourceScorer(requested, allocable []int64) int64 {
	var resourceToFractions []float64
	var totalFraction float64
    
    // 对于每种资源,计算请求的资源和可分配资源的比例,并将比例加入到 resourceToFractions 数组中。
    // 计算 resourceToFractions 数组中所有元素的总和 totalFraction。
	for i := range requested {
		if allocable[i] == 0 {
			continue
		}
		fraction := float64(requested[i]) / float64(allocable[i])
		if fraction > 1 {
			fraction = 1
		}
		totalFraction += fraction
		resourceToFractions = append(resourceToFractions, fraction)
	}

    // 标准差
	std := 0.0

	// For most cases, resources are limited to cpu and memory, the std could be simplified to std := (fraction1-fraction2)/2
	// len(fractions) > 2: calculate std based on the well-known formula - root square of Σ((fraction(i)-mean)^2)/len(fractions)
	// Otherwise, set the std to zero is enough.
    // 计算 resourceToFractions 数组的标准差
	if len(resourceToFractions) == 2 {
		std = math.Abs((resourceToFractions[0] - resourceToFractions[1]) / 2)

	} else if len(resourceToFractions) > 2 {
		mean := totalFraction / float64(len(resourceToFractions))
		var sum float64
		for _, fraction := range resourceToFractions {
			sum = sum + (fraction-mean)*(fraction-mean)
		}
		std = math.Sqrt(sum / float64(len(resourceToFractions)))
	}
    
    // 返回一个分数,该分数等于 (1 - 标准差) * MaxNodeScore
    // 这意味着,如果标准差越小(即资源利用更均衡),则返回的分数越高
	// STD (standard deviation) is always a positive value. 1-deviation lets the score to be higher for node which has least deviation and
	// multiplying it with `MaxNodeScore` provides the scaling factor needed.
	return int64((1 - std) * float64(framework.MaxNodeScore))
}


在 pkg/scheduler/framework/plugins/noderesources/fit.go 文件下初始化打分机制(最少资源使用、最多资源使用、资源占用比例等),如下所示:

// NewFit initializes a new plugin and returns it.
func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
	args, ok := plArgs.(*config.NodeResourcesFitArgs)
	if !ok {
		return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", plArgs)
	}
	if err := validation.ValidateNodeResourcesFitArgs(nil, args); err != nil {
		return nil, err
	}

	if args.ScoringStrategy == nil {
		return nil, fmt.Errorf("scoring strategy not specified")
	}

	strategy := args.ScoringStrategy.Type
    // Node 节点资源调度策略 
	scorePlugin, exists := nodeResourceStrategyTypeMap[strategy]
	if !exists {
		return nil, fmt.Errorf("scoring strategy %s is not supported", strategy)
	}

	return &Fit{
		ignoredResources:                sets.New(args.IgnoredResources...),
		ignoredResourceGroups:           sets.New(args.IgnoredResourceGroups...),
		enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling,
		enableSidecarContainers:         fts.EnableSidecarContainers,
		handle:                          h,
        // 打分机制
		resourceAllocationScorer:        *scorePlugin(args),
	}, nil
}

// nodeResourceStrategyTypeMap maps strategy to scorer implementation
var nodeResourceStrategyTypeMap = map[config.ScoringStrategyType]scorer{
	config.LeastAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
		resources := args.ScoringStrategy.Resources
		return &resourceAllocationScorer{
			Name:      string(config.LeastAllocated),
            // 最少使用资源
			scorer:    leastResourceScorer(resources),
			resources: resources,
		}
	},
	config.MostAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
		resources := args.ScoringStrategy.Resources
		return &resourceAllocationScorer{
			Name:      string(config.MostAllocated),
            // 最多使用资源
			scorer:    mostResourceScorer(resources),
			resources: resources,
		}
	},
	config.RequestedToCapacityRatio: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
		resources := args.ScoringStrategy.Resources
		return &resourceAllocationScorer{
			Name:      string(config.RequestedToCapacityRatio),
            // 资源利用率
			scorer:    requestedToCapacityRatioScorer(resources, args.ScoringStrategy.RequestedToCapacityRatio.Shape),
			resources: resources,
		}
	},
}

pkg/scheduler/framework/plugins/noderesources/least_allocated.go 如下所示:

// leastResourceScorer favors nodes with fewer requested resources.
// It calculates the percentage of memory, CPU and other resources requested by pods scheduled on the node, and
// prioritizes based on the minimum of the average of the fraction of requested to capacity.
//
// Details:
// (cpu((capacity-requested)*MaxNodeScore*cpuWeight/capacity) + memory((capacity-requested)*MaxNodeScore*memoryWeight/capacity) + ...)/weightSum

// 计算了每种资源的未使用量(可分配-请求),然后根据每种资源的权重计算出一个加权平均分。
// 最后,它返回一个分数,分数越高表示节点上未使用的资源越多。
func leastResourceScorer(resources []config.ResourceSpec) func([]int64, []int64) int64 {
	return func(requested, allocable []int64) int64 {
		var nodeScore, weightSum int64
        // 对于每种资源,计算未使用的资源量,并乘以该资源的权重和一个最大分数(framework.MaxNodeScore),得到每种资源的得分;
        // 将所有资源的得分相加,然后除以所有资源权重的总和,得到一个平均分;
        // 返回这个平均分;
        
		for i := range requested {
			if allocable[i] == 0 {
				continue
			}
			weight := resources[i].Weight
			resourceScore := leastRequestedScore(requested[i], allocable[i])
			nodeScore += resourceScore * weight
			weightSum += weight
		}
		if weightSum == 0 {
			return 0
		}
		return nodeScore / weightSum
	}
}

// The unused capacity is calculated on a scale of 0-MaxNodeScore
// 0 being the lowest priority and `MaxNodeScore` being the highest.
// The more unused resources the higher the score is.
func leastRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		return 0
	}

	return ((capacity - requested) * framework.MaxNodeScore) / capacity
}

pkg/scheduler/framework/plugins/noderesources/most_allocated.go 如下所示:

// mostResourceScorer favors nodes with most requested resources.
// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes
// based on the maximum of the average of the fraction of requested to capacity.
//
// Details:
// (cpu(MaxNodeScore * requested * cpuWeight / capacity) + memory(MaxNodeScore * requested * memoryWeight / capacity) + ...) / weightSum

// 它计算了每种资源已被请求的量(请求/可分配),然后根据每种资源的权重计算出一个加权平均分。
// 最后,它返回一个分数,分数越高表示节点上已被请求使用的资源越多。
func mostResourceScorer(resources []config.ResourceSpec) func(requested, allocable []int64) int64 {
	return func(requested, allocable []int64) int64 {
		var nodeScore, weightSum int64
        // 对于每种资源,计算已被请求的资源量,并乘以该资源的权重和一个最大分数(framework.MaxNodeScore),得到每种资源的得分;
        // 将所有资源的得分相加,然后除以所有资源权重的总和,得到一个平均分;
        // 返回这个平均分;
		for i := range requested {
			if allocable[i] == 0 {
				continue
			}
			weight := resources[i].Weight
			resourceScore := mostRequestedScore(requested[i], allocable[i])
			nodeScore += resourceScore * weight
			weightSum += weight
		}
		if weightSum == 0 {
			return 0
		}
		return nodeScore / weightSum
	}
}

// The used capacity is calculated on a scale of 0-MaxNodeScore (MaxNodeScore is
// constant with value set to 100).
// 0 being the lowest priority and 100 being the highest.
// The more resources are used the higher the score is. This function
// is almost a reversed version of noderesources.leastRequestedScore.
func mostRequestedScore(requested, capacity int64) int64 {
	if capacity == 0 {
		return 0
	}
	if requested > capacity {
		// `requested` might be greater than `capacity` because pods with no
		// requests get minimum values.
		requested = capacity
	}

	return (requested * framework.MaxNodeScore) / capacity
}

折线函数(Broken Linear Function)是一种分段线性函数,即这个函数是由多个线性函数(或称为线性段)组成的。在每个分段上,函数的形式都是线性的,即 y = ax + b 的形式,每个线性段在其端点处与相邻的线性段相接。在具体的应用场景中,在 Kubernetes 资源调度的例子中,折线函数可以用来表示不同资源利用率下对应的资源调度得分。利用率点(Utilization)是函数的参数,得分(Score)是函数的值。根据资源的利用率,可以在折线函数上找到对应的得分,从而实现根据资源使用情况的动态调整调度策略。

pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio.go 如下所示:

const maxUtilization = 100

// buildRequestedToCapacityRatioScorerFunction allows users to apply bin packing
// on core resources like CPU, Memory as well as extended resources like accelerators.
// buildRequestedToCapacityRatioScorerFunction 函数接收一个打分函数形状(scoringFunctionShape)和资源规格列表(resources),返回一个接收请求资源和可分配资源的函数,该函数会计算出每种资源的得分,然后根据资源的权重进行加权平均,得出最终的节点得分。
func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.FunctionShape, resources []config.ResourceSpec) func([]int64, []int64) int64 {
	rawScoringFunction := helper.BuildBrokenLinearFunction(scoringFunctionShape)
	resourceScoringFunction := func(requested, capacity int64) int64 {
		if capacity == 0 || requested > capacity {
			return rawScoringFunction(maxUtilization)
		}

		return rawScoringFunction(requested * maxUtilization / capacity)
	}
	return func(requested, allocable []int64) int64 {
		var nodeScore, weightSum int64
		for i := range requested {
			if allocable[i] == 0 {
				continue
			}
			weight := resources[i].Weight
			resourceScore := resourceScoringFunction(requested[i], allocable[i])
			if resourceScore > 0 {
				nodeScore += resourceScore * weight
				weightSum += weight
			}
		}
		if weightSum == 0 {
			return 0
		}
		return int64(math.Round(float64(nodeScore) / float64(weightSum)))
	}
}

// requestedToCapacityRatioScorer 函数接收资源规格列表和利用率形状点列表(shape),构造出打分函数形状,然后调用buildRequestedToCapacityRatioScorerFunction 函数,返回一个打分函数。
func requestedToCapacityRatioScorer(resources []config.ResourceSpec, shape []config.UtilizationShapePoint) func([]int64, []int64) int64 {
	shapes := make([]helper.FunctionShapePoint, 0, len(shape))
	for _, point := range shape {
		shapes = append(shapes, helper.FunctionShapePoint{
			Utilization: int64(point.Utilization),
			// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
			// therefore we need to scale the score returned by requested to capacity ratio to the score range
			// used by the scheduler.
			Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
		})
	}

	return buildRequestedToCapacityRatioScorerFunction(shapes, resources)
}

// FunctionShapePoint represents a shape point.
type FunctionShapePoint struct {
	// Utilization is function argument.
	Utilization int64
	// Score is function value.
	Score int64
}

// 构建一种被称为"折线函数"(Broken Linear Function)的函数。
// 这种函数由多个线性段组成,每个线性段在不同的利用率点上相交。具体的函数值取决于输入值在哪个线性段范围内。
// FunctionShapePoint结构体表示一个形状点,包含两个字段:Utilization和Score。
// Utilization表示函数参数,即资源的利用率,而Score表示函数值,即根据资源利用率计算出的得分。

// BuildBrokenLinearFunction creates a function which is built using linear segments. Segments are defined via shape array.
// Shape[i].Utilization slice represents points on "Utilization" axis where different segments meet.
// Shape[i].Score represents function values at meeting points.

// BuildBrokenLinearFunction 函数接收一个形状数组shape,返回一个函数,这个函数根据输入的资源利用率计算得分。函数的计算规则如下:
// 如果资源利用率小于shape[0].Utilization,则返回shape[0].Score;
// 如果资源利用率大于shape[n-1].Utilization,则返回shape[n-1].Score;
// 如果资源利用率在shape[i-1].Utilization和shape[i].Utilization之间,则返回对应的线性插值得分;

// function f(p) is defined as:
//	shape[0].Score for p < shape[0].Utilization
//	shape[n-1].Score for p > shape[n-1].Utilization
//
// and linear between points (p < shape[i].Utilization)
func BuildBrokenLinearFunction(shape FunctionShape) func(int64) int64 {
	return func(p int64) int64 {
		for i := 0; i < len(shape); i++ {
			if p <= int64(shape[i].Utilization) {
				if i == 0 {
					return shape[0].Score
				}
				return shape[i-1].Score + (shape[i].Score-shape[i-1].Score)*(p-shape[i-1].Utilization)/(shape[i].Utilization-shape[i-1].Utilization)
			}
		}
		return shape[len(shape)-1].Score
	}
}

imagelocality

// NewInTreeRegistry builds the registry with all the in-tree plugins.
// A scheduler that runs out of tree plugins can register additional plugins
// through the WithFrameworkOutOfTreeRegistry option.
func NewInTreeRegistry() runtime.Registry {
    registry := runtime.Registry{
        ......
        imagelocality.Name:                   imagelocality.New,
        ......
    }
    return registry
}

ImageLocality 是一个评分插件,它倾向于选择那些已经拥有请求的 pod 容器镜像的节点

// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.

// 这两个阈值被用作镜像分数范围的界限。
// 它们对应于压缩并存储在注册表中的容器镜像的合理大小范围;
// Docker Hub上的90%的镜像都落在这个范围内 230MB - 1GB
const (
	mb                    int64 = 1024 * 1024
	minThreshold          int64 = 23 * mb
	maxContainerThreshold int64 = 1000 * mb
)

// ImageLocality is a score plugin that favors nodes that already have requested pod container's images.
// ImageLocality 是一个评分插件,它倾向于选择那些已经拥有请求的 pod 容器镜像的节点
type ImageLocality struct {
	handle framework.Handle
}

var _ framework.ScorePlugin = &ImageLocality{}

// Name is the name of the plugin used in the plugin registry and configurations.
const Name = names.ImageLocality

// Name returns name of the plugin. It is used in logs, etc.
func (pl *ImageLocality) Name() string {
	return Name
}

// Score invoked at the score extension point.
func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	if err != nil {
		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
	}

	nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
	if err != nil {
		return 0, framework.AsStatus(err)
	}
	totalNumNodes := len(nodeInfos)
    // 给定一个节点上请求镜像的总分数(sumScores),节点的优先级是通过用与总分数成比例的比率来缩放最大优先级值来获得的。
	imageScores := sumImageScores(nodeInfo, pod, totalNumNodes)
	score := calculatePriority(imageScores, len(pod.Spec.InitContainers)+len(pod.Spec.Containers))

	return score, nil
}

// ScoreExtensions of the Score plugin.
func (pl *ImageLocality) ScoreExtensions() framework.ScoreExtensions {
	return nil
}

// New initializes a new plugin and returns it.
func New(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) {
	return &ImageLocality{handle: h}, nil
}

// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64, numContainers int) int64 {
    // 函数 calculatePriority 首先计算了一个最大阈值(maxThreshold),这个阈值是容器数量(numContainers)与设定的最大容器阈值(maxContainerThreshold)的乘积。
    // 然后,它会检查总分数(sumScores)是否在最小阈值(minThreshold)和最大阈值之间,如果不在这个范围,就会被调整到这个范围内。
    
	maxThreshold := maxContainerThreshold * int64(numContainers)
	if sumScores < minThreshold {
		sumScores = minThreshold
	} else if sumScores > maxThreshold {
		sumScores = maxThreshold
	}
    // 然后,函数使用这个调整后的总分数,根据公式 (sumScores - minThreshold) / (maxThreshold - minThreshold) 计算出一个比率
    // 然后用这个比率来缩放最大优先级值(framework.MaxNodeScore),得到最终的节点优先级。
	return framework.MaxNodeScore * (sumScores - minThreshold) / (maxThreshold - minThreshold)
}

// 计算并返回一个节点上所有已存在的容器镜像的总分 
// sumImageScores returns the sum of image scores of all the containers that are already on the node.
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
// the final score.

// 函数 sumImageScores 遍历 pod 中的初始化容器 InitContainers 和其他容器 Containers,
// 对于每一个容器,它都会检查该容器的镜像是否已经存在于目标节点上。
// 如果存在,它会调用 scaledImageScore 函数来计算这个镜像的得分,并将其累加到总分中。
func sumImageScores(nodeInfo *framework.NodeInfo, pod *v1.Pod, totalNumNodes int) int64 {
	var sum int64
	for _, container := range pod.Spec.InitContainers {
		if state, ok := nodeInfo.ImageStates[normalizedImageName(container.Image)]; ok {
			sum += scaledImageScore(state, totalNumNodes)
		}
	}
	for _, container := range pod.Spec.Containers {
		if state, ok := nodeInfo.ImageStates[normalizedImageName(container.Image)]; ok {
			sum += scaledImageScore(state, totalNumNodes)
		}
	}
	return sum
}

// 这个得分是基于镜像的大小计算的,并且会根据节点的总数进行缩放。这样一来,镜像的大小和节点的数量都会影响到最终的得分。
// 这有助于在进行 pod 调度时,优先选择那些已经拥有所需镜像的节点,从而可以减少镜像拉取的时间,加快 pod 的启动速度。
// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *framework.ImageStateSummary, totalNumNodes int) int64 {
	spread := float64(imageState.NumNodes) / float64(totalNumNodes)
	return int64(float64(imageState.Size) * spread)
}

// normalizedImageName returns the CRI compliant name for a given image.
// TODO: cover the corner cases of missed matches, e.g,
// 1. Using Docker as runtime and docker.io/library/test:tag in pod spec, but only test:tag will present in node status
// 2. Using the implicit registry, i.e., test:tag or library/test:tag in pod spec but only docker.io/library/test:tag
// in node status; note that if users consistently use one registry format, this should not happen.
func normalizedImageName(name string) string {
	if strings.LastIndex(name, ":") <= strings.LastIndex(name, "/") {
		name = name + ":latest"
	}
	return name
}


out-of-tree

https://github.com/kubernetes-sigs/scheduler-plugins 是基于 Scheduler Framework 调度框架的 out-of-tree 插件实现。scheduler-plugins 提供了在大规模 Kubernetes 集群中使用的调度器插件,这些插件可以作为Golang SDK库进行引用,或者通过预构建的镜像或 Helm 进行开箱即用。此外,这个仓库还整合了编写高质量调度器插件的最佳实践和实用工具。

kube-scheduler 二进制文件包含以下插件列表,可以通过创建一个或多个调度器配置文件来配置它们。

此外,kube-scheduler二进制文件还包含以下示例插件列表(不推荐生产环境使用)。

Network-Aware

https://github.com/kubernetes-sigs/scheduler-plugins/pull/282 是 KEP 设计文档。

物联网(IoT)、多层次的网络服务和视频流服务这样的应用,将最大程度地从网络感知的调度策略中受益,这些策略除了考虑调度器使用的默认资源(CPU和内存)外,还考虑延迟和带宽等。

该提案引入了两个定义为自定义资源定义(CRD)的自定义资源(CR):

  • AppGroup CRD:抽象化服务拓扑以维护应用程序微服务的依赖关系;
  • NetworkTopology CRD:抽象化网络基础设施以在集群的各个区域和区域之间建立网络权重;

因此,在调度过程中,将考虑应用程序和基础设施网络拓扑。该提案还介绍了一个带宽资源组件(DaemonSet),用于将带宽资源作为扩展资源进行调度,以便已有的过滤/评分插件(例如PodFitsResources,BalancedAllocation)考虑带宽分配。



两个插件:

  • TopologicalSort的QueueSort函数,根据Pod的拓扑顺序对其进行排序。工作负载的依赖关系在AppGroup CRD中定义,然后AppGroup控制器通过拓扑排序算法计算出调度这些工作负载的首选顺序,该插件偏好索引较低的Pod;
  • NetworkOverhead的过滤和评分插件,如果节点不能支持Pod的AppGroup的网络需求,就会被过滤掉,或者根据网络权重进行评分,以确保同一应用程序的Pod的网络延迟最小;

img

TopologicalSort Plugin (QueueSort)

TopologicalSort(QueueSort)插件根据 https://github.com/diktyo-io/appgroup-api 中的pod与 https://en.wikipedia.org/wiki/Topological_sorting 相关的微服务依赖关系进行排序。

如果Pod属于AppGroup则应根据其拓扑信息进行排序,TopologicalSort 插件比较AppGroup CRD中可用的pod的索引以获取首选的排序算法。如果Pod不属于AppGroup或属于不同的AppGroup,我们将遵循QoS插件默认提供的less函数策略。

// Less is the function used by the activeQ heap algorithm to sort pods.
// Sort Pods based on their App Group and corresponding service topology.
// Otherwise, follow the strategy of the QueueSort Plugin
func (ts *TopologicalSort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
    // 1) Check if both pods belong to an AppGroup
    // 检查这两个pods是否都属于一个AppGroup
    (...)
    // 2) If one of them does not belong -> Return: Follow the Less function of the QoS Sort plugin.
    // 如果其中一个不属于 -> 返回:按照QoS Sort插件的Less函数进行操作。 
    (...)
    // 3) Check if both pods belong to the same AppGroup
    //  检查这两个pods是否属于同一个AppGroup
    (...)
    // 4) If Pods belong to the same App Group -> Get AppGroup from AppGroup lister
    // 如果Pods属于同一个App Group -> 从AppGroup lister获取AppGroup
    (...)
        // 4.1) Binary search to find both order indexes since topology list is ordered by Workload Name
        // 二进制搜索来找到两个顺序索引,因为拓扑列表是按Workload Name排序的
        (...)
        // 4.2) Return: a lower index is better, thus invert result!
        // 返回:较低的索引更好,因此反转结果!
        return !(order(pInfo1) > order(pInfo2))
    // 5) Pods do not belong to the same App Group: return and follow the strategy from the QoS plugin
    // Pods不属于同一个App Group:返回并遵循QoS插件的策略
    (...)
}

TopologicalSort 源码解析如下所示:

const (
	// Name : name of plugin used in the plugin registry and configurations.
	Name = "TopologicalSort"
)

var scheme = runtime.NewScheme()

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
	utilruntime.Must(agv1alpha.AddToScheme(scheme))
}

// TopologicalSort : Sort pods based on their AppGroup and corresponding microservice dependencies
type TopologicalSort struct {
	client.Client
	handle     framework.Handle
	namespaces []string
}

var _ framework.QueueSortPlugin = &TopologicalSort{}

// Name : returns the name of the plugin.
func (ts *TopologicalSort) Name() string {
	return Name
}

// getArgs : returns the arguments for the TopologicalSort plugin.
// 获取 TopologicalSort 打分机制参数
func getArgs(obj runtime.Object) (*pluginconfig.TopologicalSortArgs, error) {
	TopologicalSortArgs, ok := obj.(*pluginconfig.TopologicalSortArgs)
	if !ok {
		return nil, fmt.Errorf("want args to be of type TopologicalSortArgs, got %T", obj)
	}

	return TopologicalSortArgs, nil
}

// New : create an instance of a TopologicalSort plugin
func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
	klog.V(4).InfoS("Creating new instance of the TopologicalSort plugin")

    // 启动参数
	args, err := getArgs(obj)
	if err != nil {
		return nil, err
	}

	client, err := client.New(handle.KubeConfig(), client.Options{
		Scheme: scheme,
	})
	if err != nil {
		return nil, err
	}

	pl := &TopologicalSort{
		Client:     client,
		handle:     handle,
		namespaces: args.Namespaces,
	}
	return pl, nil
}

// Less is the function used by the activeQ heap algorithm to sort pods.
// 1) Sort Pods based on their AppGroup and corresponding service topology graph.
// 2) Otherwise, follow the strategy of the in-tree QueueSort Plugin (PrioritySort Plugin)
func (ts *TopologicalSort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
	p1AppGroup := networkawareutil.GetPodAppGroupLabel(pInfo1.Pod)
	p2AppGroup := networkawareutil.GetPodAppGroupLabel(pInfo2.Pod)

	// If pods do not belong to an AppGroup, or being to different AppGroups, follow vanilla QoS Sort
    // 如果Pod不属于AppGroup,或者属于不同的AppGroup,请遵循普通的QoS排序
	if p1AppGroup != p2AppGroup || len(p1AppGroup) == 0 {
		klog.V(4).InfoS("Pods do not belong to the same AppGroup CR", "p1AppGroup", p1AppGroup, "p2AppGroup", p2AppGroup)
		s := &queuesort.PrioritySort{}
		return s.Less(pInfo1, pInfo2)
	}

	// Pods belong to the same appGroup, get the CR
    // Pod 属于同一个appGroup,获取CR
	klog.V(6).InfoS("Pods belong to the same AppGroup CR", "p1 name", pInfo1.Pod.Name, "p2 name", pInfo2.Pod.Name, "appGroup", p1AppGroup)
	agName := p1AppGroup
	appGroup := ts.findAppGroupTopologicalSort(agName)

	// Get labels from both pods
	labelsP1 := pInfo1.Pod.GetLabels()
	labelsP2 := pInfo2.Pod.GetLabels()

	// Binary search to find both order index since topology list is ordered by Workload Name
	orderP1 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP1[agv1alpha.AppGroupSelectorLabel])
	orderP2 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP2[agv1alpha.AppGroupSelectorLabel])

	klog.V(6).InfoS("Pod order values", "p1 order", orderP1, "p2 order", orderP2)

	// Lower is better
	return orderP1 <= orderP2
}

// 如果在给定的命名空间中找到了指定名称的AppGroup,就会返回这个AppGroup。
// 如果在所有给定的命名空间中都没有找到指定名称的AppGroup,就会返回nil。
func (ts *TopologicalSort) findAppGroupTopologicalSort(agName string) *agv1alpha.AppGroup {
	klog.V(6).InfoS("namespaces: %s", ts.namespaces)
	for _, namespace := range ts.namespaces {
		klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName)
		// AppGroup couldn't be placed in several namespaces simultaneously
		appGroup := &agv1alpha.AppGroup{}
		err := ts.Get(context.TODO(), client.ObjectKey{
			Namespace: namespace,
			Name:      agName,
		}, appGroup)
		if err != nil {
			klog.V(4).InfoS("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err)
			continue
		}
		if appGroup != nil {
			return appGroup
		}
	}
	return nil
}

// FindPodOrder : return the order index of the given pod
// 这段代码的主要功能是在一个给定的AppGroupTopologyList中查找特定的pod,并返回其顺序索引。
// 这个函数使用了二分查找法(Binary Search)在AppGroupTopologyList中查找pod。
// 这个列表必须是已排序的,且每个元素都有一个唯一的选择器(Selector)和索引(Index)。
// 函数会比较中间元素的选择器和给定的选择器,如果它们相等,则返回中间元素的索引。
// 如果中间元素的选择器小于给定的选择器,则在列表的右半部分(即索引更大的部分)继续查找。
// 如果中间元素的选择器大于给定的选择器,则在列表的左半部分(即索引更小的部分)继续查找。
func FindPodOrder(t agv1alpha1.AppGroupTopologyList, selector string) int32 {
	low := 0
	high := len(t) - 1

	for low <= high {
		mid := (low + high) / 2
		if t[mid].Workload.Selector == selector {
			return t[mid].Index // Return the index
		} else if t[mid].Workload.Selector < selector {
			low = mid + 1
		} else if t[mid].Workload.Selector > selector {
			high = mid - 1
		}
	}
	return -1
}
# Example App Group CRD spec
apiVersion: scheduling.sigs.x-k8s.io/v1alpha1
kind: AppGroup
metadata:
  name: a1
spec:
  numMembers: 3
  topologySortingAlgorithm: KahnSort
  workloads: 
    - workload:
        kind: Deployment
        apiVersion: apps/v1
        namespace: default
        name: P1
      dependencies:
        - workload: 
            kind: Deployment
            apiVersion: apps/v1
            namespace: default
            name: P2
          minBandwidth: "100Mi"
          maxNetworkCost: 30
    - workload: 
        kind: Deployment
        apiVersion: apps/v1
        namespace: default
        name: P2
      dependencies:
        - workload:
            kind: Deployment
            apiVersion: apps/v1
            namespace: default
            name: P3
          minBandwidth: "250Mi"
          maxNetworkCost: 20
    - workload:
        kind: Deployment
        apiVersion: apps/v1
        namespace: default
        name: P3

让我们看下示例,AppGroup由11个工作负载(11组pod)组成,基于KahnSort算法的拓扑顺序是P1,P10,P9,P8,P7,P6,P5,P4,P3,P2,P11。

img

该插件支持低索引,因此,根据在Less函数中评估的两个pod,结果(bool)如下:

img

NetworkOverhead Plugin (Filter & Score)

NetworkOverhead 过滤和评分(Filter & Score)插件根据在 https://github.com/diktyo-io/appgroup-api 中定义的微服务依赖性过滤节点,并对网络成本较低的节点(在 NetworkTopology 中描述)进行更高的评分,以实现延迟感知调度。

网络拓扑考虑因素如下所示:

  • 在AppGroup CR中建立的工作负载依赖关系;
  • 提供比maxNetworkCost需求更高网络成本的节点必须被过滤掉;
  • 可能存在多个依赖关系,因为网络上可能已经部署了多个pod;
  • 计划过滤掉未满足大量依赖关系的节点,以减少被评分的节点数量;
  • 在后期阶段考虑minBandwidth需求;
  • 根据每个区域/区域中可用的带宽容量/可分配的带宽;
# Example Network CRD 
apiVersion: scheduling.sigs.x-k8s.io/v1alpha1
kind: NetworkTopology
metadata:
  name: net-topology-test
  namespace: default
spec:
  configMapName: "netperfMetrics"
  weights:
    # Region label: "topology.kubernetes.io/region"
    # Zone Label:   "topology.kubernetes.io/zone"
    # 2 Regions:  us-west-1
    #             us-east-1
    # 4 Zones:    us-west-1: z1, z2
    #             us-east-1: z3, z4
    - name: "UserDefined"
      costList: # Define weights between regions or between zones 
        - topologyKey: "topology.kubernetes.io/region" # region costs
          originCosts:
            - origin: "us-west-1"
              costs:
                - destination: "us-east-1"
                  bandwidthCapacity: "10Gi"
                  networkCost: 20
            - origin: "us-east-1"
              costs:
                - destination: "us-west-1"
                  bandwidthCapacity: "10Gi"
                  networkCost: 20
        - topologyKey: "topology.kubernetes.io/zone" # zone costs
          originCosts:
            - origin: "z1"
              costs:
                - destination: "z2"
                  bandwidthCapacity: "1Gi"
                  networkCost: 5
            - origin: "z2"
              costs:
                - destination: "z1"
                  bandwidthCapacity: "1Gi"
                  networkCost: 5
            - origin: "z3"
              costs:
                - destination: "z4"
                  bandwidthCapacity: "1Gi"
                  networkCost: 10
            - origin: "z4"
              costs:
                - destination: "z3"
                  bandwidthCapacity: "1Gi"
                  networkCost: 10

NetworkOverhead 源码解析如下所示:

// Filter : evaluate if node can respect maxNetworkCost requirements
func (pl *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    // 1) Check if Pod belongs to an AppGroup
    (...)
    // 2) Check if pods already available (AppGroup lister), otherwise return
    (...)
    // 3) Check Dependencies of the given pod 
    (...)
    // 4) Retrieve network costs from the NetworkTopology CRD based on the region and zone of the node being filtered    
    (...)
    // 5) Save them in a map to search for costs faster
    (...)
    // 6) Main Procedure: check if the node is able to meet maxNetworkCost requirements
        // For Loop: check all workloads allocated in the cluster and see if dependencies are met if pod is allocated on the node
        (...) // If the node being filtered and the pod's hostname is the same node -> numOK = numOK + 1 (dependency respected)
        (...) // If Nodes belong to the same zone -> numOK = numOK + 1 (dependency respected)
        (...) // Otherwise, retrieve the cost from the map:  
        (...) // If the cost (retrieved from map) <= dependency MaxNetworkCost -> numOK = numOK + 1 (dependency respected)             
        (...) // Otherwise: (cost > dependency MaxNetworkCost) -> numNotOK = numNotOK + 1 (dependency not respected)

    // 7) If the number of unmet dependencies is higher than the number of respected dependencies: return framework.Unschedulable
    if numNotOK > numOK{
        return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Node %v does not meet several 
            network requirements from Pod dependencies: OK: %v NotOK: %v", nodeInfo.Node().Name, numOK, numNotOK))
    // 8) Otherwise, node can schedule the pod: return nil 
    return nil
}

Scheduler Config 示例

调度器配置插件示例:

apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: false
clientConnection:
  kubeconfig: "REPLACE_ME_WITH_KUBE_CONFIG_PATH"
profiles:
- schedulerName: network-aware-scheduler
  plugins:
    queueSort:
      enabled:
      - name: TopologicalSort
      disabled:
      - name: "*"
    filter:
      enabled:
      - name: NetworkOverhead
    score:
      disabled: # Preferably avoid the combination of NodeResourcesFit with NetworkOverhead
      - name: NodeResourcesFit
      enabled: # A higher weight is given to NetworkOverhead to favor allocation schemes with lower latency.
      - name: NetworkOverhead
        weight: 5
      - name: BalancedAllocation
        weight: 1
  pluginConfig:
  - name: TopologicalSort
    args:
      namespaces:
      - "default"
  - name: NetworkOverhead
    args:
      namespaces:
      - "default"
      weightsName: "UserDefined" # weights applied by the plugin
      networkTopologyName: "net-topology-test" # networkTopology CR used by the plugin

网络感知设计方案

更多的网络感知可参考 https://github.com/kubernetes-sigs/scheduler-plugins/tree/master/kep/260-network-aware-scheduling

其他示例

示例主要功能设计方案备注
Capacity Scheduling越来越多的需求希望使用Kubernetes来管理批处理工作负载(ML/DL)。在这些情况下,一个挑战是在确保每个用户有合理的资源量的同时,提高集群的利用率。这个问题可以通过Kubernetes的ResourceQuota部分解决。原生的Kubernetes ResourceQuota API可以用来指定每个命名空间的最大总资源分配。配额的执行是通过准入检查完成的。如果累计的资源分配超过了配额限制,就不能创建配额消费者(例如,Pod)。换句话说,当Pod创建时,整体资源使用是根据Pod的规格(即,cpu/mem请求)进行聚合的。Kubernetes配额设计存在的限制是:配额资源使用是基于资源配置(例如,Pod规格中指定的Pod cpu/mem请求)进行聚合的。虽然这种机制可以保证实际的资源消耗永远不会超过ResourceQuota限制,但可能会导致资源利用率低,因为一些Pod可能已经申请了资源,但未能被调度。例如,实际资源消耗可能远小于限制。由于上述限制,批处理工作负载(ML/DL)在Kubernetes集群中的运行效率可能不如在其他容器编排平台(如Yarn)中高。为了克服上述限制,可以将Yarn容量调度器中使用的“ElasticQuota”概念引入Kubernetes。基本上,“ElasticQuota”有“最大”和“最小”的概念。https://github.com/kubernetes-sigs/scheduler-plugins/tree/master/kep/9-capacity-scheduling
Coscheduling目前,通过Kubernetes的默认调度器,我们无法确保一组Pods可以全部被调度。在某些情况下,由于整个应用程序不能仅依赖部分Pods运行,这将浪费资源,如Spark作业,TensorFlow作业等。这个提议旨在通过引入PodGroup CRD来解决这个问题,来完成将一组Pods连接在一起的重要工作。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/coscheduling/README.md
Node Resources基于插件参数资源参数,资源被赋予权重。CPU的基本单位是毫核,而内存的基本单位是字节。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/noderesources/README.md
Node Resource Topology在引入拓扑管理器后,集群中启动pod的问题在工作节点拥有不同NUMA拓扑和该拓扑中的资源量不同时就变得实际起来。Pod可能会被调度到资源总量足够的节点上,但资源分布可能无法满足相应的拓扑策略。在这种情况下,Pod将无法启动。对于调度器来说,更好的行为是选择合适的节点,kubelet准入处理器可能会通过。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/noderesourcetopology/README.mdMostAllocatedBalancedAllocationLeastAllocatedLeastNUMANodes
Preemption TolerationKubernetes调度器提供了Pod优先级和抢占特性。用户(通常是集群管理员)可以定义几个PriorityClass以在集群中分类优先级。如果集群的弹性较差(例如On-Premise集群),那么设计优先级类和抢占规则对于计算资源的利用率非常重要。PriorityClass可以有PreemptionPolicy配置,用于自定义该优先级类的抢占者行为。允许的值有PreemptLowerPriority(默认)和Never。如果设置为Never,优先级类将变成非抢占优先级类,它不能抢占任何Pod,但可能被更高优先级的Pod抢占。PreemptionPolicy API非常简单易懂。然而,这个策略只关注抢占者侧的行为。这个插件通过在PriorityClass中添加抢占者(也称为受害者)侧策略,提供了更灵活的抢占行为配置。特别是,集群管理员可以定义抢占容忍策略,该策略定义了优先级类将免除抢占的标准。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/preemptiontoleration/README.md
TrimaranKubernetes 提供了一种声明性资源模型,核心组件(调度器和kubelet)会遵守这种模型以保持一致性并满足QoS保证。然而,使用这种模型可能会导致集群的低利用率,原因如下:用户很难为应用程序估计准确的资源使用情况。此外,用户可能不理解资源模型,也不会设置它。Kubernetes提供的默认的树形调度插件(Score)不考虑实时节点利用率值。这个提案利用实时资源使用情况来通过提议的插件调度pod。最终的目标是在不破坏Kubernetes资源模型契约的情况下,提高集群利用率并降低集群管理的成本。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/trimaran/README.mdTargetLoadPacking:实现了一种打包策略,直到配置的CPU利用率,然后在热节点之间切换到扩展策略,支持CPU资源。LoadVariationRiskBalancing:在节点之间平衡风险,风险被定义为平均利用率和利用率变化的组合测量,支持CPU和内存资源。LowRiskOverCommitment:评估过度承诺的性能风险,并通过考虑(1)Pod的资源限制值(限制意识)和(2)节点上的实际负载(利用率)(负载意识),选择风险最低的节点。因此,它为Pod提供了一个低风险环境,缓解了过度承诺的问题,同时允许Pod使用他们的限制。
Network-Aware Scheduling许多应用程序对延迟敏感,要求应用程序中的微服务之间的延迟更低。对于端到端延迟成为主要目标的应用程序,旨在降低成本或提高资源效率的调度策略是不够的。像物联网(IoT)、多层次web服务和视频流服务这样的应用程序将最大限度地从网络感知的调度策略中受益,这些策略除了考虑调度器使用的默认资源(如CPU和内存)外,还考虑延迟和带宽。用户在使用多层次应用程序时经常遇到延迟问题。这些应用程序通常包括数十个到数百个具有复杂相互依赖关系的微服务。服务器的距离通常是主要的罪魁祸首。根据关于服务功能链(SFC)的先前工作,最好的策略是减少同一应用程序中链式微服务之间的延迟。此外,对于那些微服务之间有大量数据传输的应用程序,带宽起着至关重要的作用。例如,数据库应用程序中的多个副本可能需要频繁的复制以确保数据一致性。Spark作业可能在map和reduce节点之间有频繁的数据传输。节点中的网络容量不足可能会导致延迟增加或数据包丢失,这将降低应用程序的服务质量(QoS)。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/networkaware/README.mdhttps://github.com/kubernetes-sigs/scheduler-plugins/blob/master/kep/260-network-aware-scheduling/README.md
Cross Node PreemptionPostFilter扩展点自1.19版本起在Kubernetes调度器中引入,上游的默认实现是在同一节点上抢占Pods,为无法调度的Pod腾出空间。与"同节点抢占"策略相反,我们可以提出一个"跨节点抢占"策略,通过跨多个节点抢占Pods,这在由于"跨节点"约束(如PodTopologySpread和PodAntiAffinity)导致Pod无法调度时非常有用。这也在抢占的原始设计文档中提到过。此插件是作为一个示例构建的,演示了如何使用PostFilter扩展点,同时也启发用户构建他们自己的创新策略,如抢占一组Pods。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/crossnodepreemption/README.md
Pod State这是一个评分插件,它以以下方式考虑终止和提名的Pods:拥有更多终止Pods的节点将获得更高的分数,因为这些终止的Pods最终将从节点中物理删除拥有更多被提名的Pods(携带.status.nominatedNodeName)的节点将获得较低的分数,因为这些被提名的节点预计在未来的调度周期中将容纳一些抢占者Pod。https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/podstate/README.md
Quality of Service按照.spec.priority对Pods进行排序。默认情况下,插件按以下顺序对Pods进行排队:Guaranteed (requests == limits)Burstable (requests < limits)BestEffort (requests and limits not set)https://github.com/kubernetes-sigs/scheduler-plugins/blob/master/pkg/qos/README.md
disk-io-aware-scheduling磁盘IO资源是云原生环境中保证工作负载性能的重要资源。当前的Kubernetes调度器不支持磁盘IO资源感知调度。可能会发生在一个节点上调度的pods争夺磁盘IO资源,导致性能下降(嘈杂的邻居问题)。越来越多的需求希望在Kubernetes中添加磁盘IO资源感知调度,以避免或减轻嘈杂的邻居问题。为了支持磁盘IO资源感知调度,我们添加了一个调度插件,该插件跟踪每个pod的磁盘IO资源需求,并在做出调度决策时对每个节点上可用的磁盘IO资源进行核算。https://github.com/kubernetes-sigs/scheduler-plugins/tree/master/kep/624-disk-io-aware-scheduling



0

评论区