kubelet源码学习(一):kubelet工作原理、kubelet启动过程

2023-12-24 15:33:26

本文基于Kubernetes v1.22.4版本进行源码学习

1、kubelet工作原理

1)、kubelet核心工作

kubelet的工作核心就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括:Pod更新事件、Pod生命周期变化、kubelet本身设置的执行周期、定时的清理事件

kubelet还负责维护着很多其他的子控制循环(也就是图中的小圆圈),叫做xxxManager,比如:probeManager会定时去监控Pod中容器的健康状况,当前支持两种类型的探针:livenessProbe和readinessProbe;statusManager负责维护状态信息,并把Pod状态更新到APIServer;containerRefManager是容器引用的管理,用来报告容器的创建、失败等事件

2)、CRI与容器运行时

kubelet调用下层容器运行时的执行过程,并不会直接调用Docker的API,而是通过一组叫作CRI(Container Runtime Interface,容器运行时接口)的gRPC接口来间接执行的

CRI shim负责响应CRI请求,扮演kubelet与容器项目之间的垫片(shim)。CRI shim实现了CRI规定的每个接口,然后把具体的CRI请求翻译成对后端容器项目的请求或者操作

每一种容器项目都可以自己实现一个CRI shim,自行对CRI请求进行处理,这样,Kubernetes就有了一个统一的容器抽象层,使得下层容器运行时可以自由地对接进入Kubernetes当中

如果使用Docker的话,dockershim负责处理CRI请求,然后组装成Docker API请求发给Docker Daemon

CRI接口可以分为两组:

  • RuntimeService:主要是跟容器相关的操作,比如创建、启动、删除容器,执行exec命令等
  • ImageManagerService:主要是容器镜像相关的操作,比如拉取镜像、删除镜像等

CRI接口核心方法如下图:

CRD设计的一个重要原则,就是确保这个接口本身,只关注容器,不关注Pod,在CRI的设计里并没有一个直接创建Pod或者启动Pod的接口

PodSandboxManager中包含RunPodSandbox方法,这个PodSandbox对应的并不是Kubernetes里的Pod API对象,而只是抽取了Pod里的一部分与容器运行时相关的字段,比如HostName、DnsConfig、CgroupParent等。所以说,PodSandbox描述的其实是Kubernetes将Pod这个概念映射到容器运行时层面所需要的字段,或者说是一个Pod对象子集

比如,当执行kubectl run创建了一个名叫foo的、包括了A、B两个容器的Pod之后。如果是Docker项目,dockershim就会创建出一个名叫foo的Infra容器(pause容器)用来hold住整个Pod的Network Namespace

2、kubelet启动过程

pkg/kubelet/kubelet.goRun()方法启动了kubelet各个模块,代码如下:

// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
	if kl.kubeClient == nil {
		klog.InfoS("No API server defined - no node status update will be sent")
	}

	// Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}
	// 启动不依赖container runtime的一些模块
	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.ErrorS(err, "Failed to initialize internal modules")
		os.Exit(1)
	}

	// Start volume manager
	// 启动volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start syncing node status immediately, this may set up things the runtime needs to run.
		// 定时同步node状态
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
		// 更新pod CIDR、runtime状态以及执行首次node状态同步
		go kl.fastStatusUpdateOnce()

		// start syncing lease
		// 启动node lease机制
		go kl.nodeLeaseController.Run(wait.NeverStop)
	}
	// 定时更新runtime状态
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	// Set up iptables util rules
	if kl.makeIPTablesUtilChains {
		kl.initNetworkUtil()
	}

	// Start component sync loops.
	// 启动statusManager
	kl.statusManager.Start()

	// Start syncing RuntimeClasses if enabled.
	// 启动runtimeClassManager
	if kl.runtimeClassManager != nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.
	// 启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态
	kl.pleg.Start()
	// 启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作
	kl.syncLoop(updates, kl)
}

Run()方法主要逻辑如下:

  1. 调用kl.initializeModules()方法,启动不依赖container runtime的一些模块
  2. 启动volume manager
  3. 定时同步node状态
  4. 调用kl.fastStatusUpdateOnce()方法,更新pod CIDR、runtime状态以及执行首次node状态同步
  5. 启动node lease机制,同步节点租约
  6. 定时执行kl.updateRuntimeUp()方法,更新runtime状态
  7. 启动statusManager、runtimeClassManager
  8. 调用kl.pleg.Start()方法,启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态
  9. 调用kl.syncLoop()方法,启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作
1)、initializeModules()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeModules() error {
	// Prometheus metrics.
	metrics.Register(
		collectors.NewVolumeStatsCollector(kl),
		collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
	)
	metrics.SetNodeName(kl.nodeName)
	servermetrics.Register()

	// Setup filesystem directories.
	if err := kl.setupDataDirs(); err != nil {
		return err
	}

	// If the container logs directory does not exist, create it.
	if _, err := os.Stat(ContainerLogsDir); err != nil {
		if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
			return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)
		}
	}

	// Start the image manager.
	// 启动imageManager
	kl.imageManager.Start()

	// Start the certificate manager if it was enabled.
	// 启动certificateManager,证书相关
	if kl.serverCertificateManager != nil {
      kl.serverCertificateManager.Start()
	}

	// Start out of memory watcher.
	// 启动oomWatcher
	if kl.oomWatcher != nil {
		if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
			return fmt.Errorf("failed to start OOM watcher: %w", err)
		}
	}

	// Start resource analyzer
	// 启动resource analyzer,刷新volume stats到缓存中
	kl.resourceAnalyzer.Start()

	return nil
}

initializeModules()方法主要逻辑如下:

  1. 启动imageManager,实际上是realImageGCManager
  2. 启动certificateManager,证书相关
  3. 启动oomWatcher
  4. 启动resource analyzer,刷新volume stats到缓存中

kl.imageManager.Start()方法代码如下:

// pkg/kubelet/images/image_gc_manager.go
func (im *realImageGCManager) Start() {
	go wait.Until(func() {
		// Initial detection make detected time "unknown" in the past.
		var ts time.Time
		if im.initialized {
			ts = time.Now()
		}
		// 找出所有的image,并删除不再使用的image
		_, err := im.detectImages(ts)
		if err != nil {
			klog.InfoS("Failed to monitor images", "err", err)
		} else {
			im.initialized = true
		}
	}, 5*time.Minute, wait.NeverStop)

	// Start a goroutine periodically updates image cache.
	// 更新image的缓存
	go wait.Until(func() {
		// 调用CRI接口,获取最新的image
		images, err := im.runtime.ListImages()
		if err != nil {
			klog.InfoS("Failed to update image list", "err", err)
		} else {
			im.imageCache.set(images)
		}
	}, 30*time.Second, wait.NeverStop)

}

realImageGCManager的Start()方法启动两个协程

  1. 定时调用detectImages()方法,会找出所有正在使用的image,然后删除不再使用的image
  2. 定时获取最新的image,调用imageCache()方法更新image的缓存
2)、fastStatusUpdateOnce()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) fastStatusUpdateOnce() {
	for {
		time.Sleep(100 * time.Millisecond)
		node, err := kl.GetNode()
		if err != nil {
			klog.ErrorS(err, "Error getting node")
			continue
		}
		if len(node.Spec.PodCIDRs) != 0 {
			podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
			if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
				klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
				continue
			}
			// 更新runtime状态
			kl.updateRuntimeUp()
			// node状态同步
			kl.syncNodeStatus()
			return
		}
	}
}

fastStatusUpdateOnce()方法启动一个循环,尝试立即更新pod CIDR。更新pod CIDR后,会更新runtime状态并同步node状态。该方法在一次成功的node状态同步后直接返回,仅在kubelet启动期间执行

kl.updateRuntimeUp()方法代码如下:

// pkg/kubelet/kubelet.go
// 首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {
	kl.updateRuntimeMux.Lock()
	defer kl.updateRuntimeMux.Unlock()
	// 获取containerRuntime状态
	s, err := kl.containerRuntime.Status()
	if err != nil {
		klog.ErrorS(err, "Container runtime sanity check failed")
		return
	}
	if s == nil {
		klog.ErrorS(nil, "Container runtime status is nil")
		return
	}
	// Periodically log the whole runtime status for debugging.
	klog.V(4).InfoS("Container runtime status", "status", s)
	networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
	// 检查network和runtime是否处于ready状态
	if networkReady == nil || !networkReady.Status {
		klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
		kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
	} else {
		// Set nil if the container runtime network is ready.
		kl.runtimeState.setNetworkState(nil)
	}
	// information in RuntimeReady condition will be propagated to NodeReady condition.
	// 获取运行时状态
	runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
	// If RuntimeReady is not set or is false, report an error.
	if runtimeReady == nil || !runtimeReady.Status {
		klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
		kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
		return
	}
	kl.runtimeState.setRuntimeState(nil)
	// 调用kl.initializeRuntimeDependentModules初始化依赖模块
	kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
	kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

updateRuntimeUp()方法会获取containerRuntime状态信息,然后根据返回containerRuntime状态检查网络、runtime是不是已经处于ready状态;接着调用kl.initializeRuntimeDependentModules()方法初始化依赖模块,这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager、shutdownManager;最后设置runtime同步时间

3)、syncLoop()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.InfoS("Starting kubelet main sync loop")
	// The syncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	// Responsible for checking limits in resolv.conf
	// The limits do not have anything to do with individual pods
	// Since this is called in syncLoop, we don't need to call it anywhere else
	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
		kl.dnsConfigurer.CheckLimitsForResolvConf()
	}

	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.ErrorS(err, "Skipping pod synchronization")
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		// 调用kl.syncLoopIteration方法
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

syncLoop()方法在一个循环中不断的调用syncLoopIteration()方法

关于syncLoopIteration()方法中涉及的channel后面会详细介绍,这里只关注syncLoopIteration()方法中的处理逻辑

1)configCh

// pkg/kubelet/kubelet.go
// 该方法会监听多个channel,当发现任何一个channel有数据就交给handler去处理,在handler中通过调用dispatchWork分发任务
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// 该模块将同时watch 3个不同来源的pod信息的变化(file,http,apiServer)
		// 一旦某个来源的pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的pod信息和更新的具体操作
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.ErrorS(nil, "Kubelet does not support snapshot update")
		default:
			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
		}

		kl.sourcesReady.AddSource(u.Source)
    ...
	}
	return true
}    

configCh是读取配置事件的管道,该模块将同时watch 3个不同来源的Pod信息的变化(file、http、APIServer),一旦某个来源的Pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的Pod信息和更新的具体操作

2)plegCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	...
	case e := <-plegCh:
		if e.Type == pleg.ContainerStarted {
			// record the most recent time we observed a container start for this pod.
			// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
			// to make sure we don't miss handling graceful termination for containers we reported as having started.
			kl.lastContainerStartedTime.Add(e.ID, time.Now())
		}
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
			}
		}

		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
    ...
	}
	return true
}        

kl.pleg.Start()的时候会每秒钟调用一次relist,根据最新的PodStatus生成PodLiftCycleEvent,然后存入到plegCh中

syncLoop()方法中调用kl.pleg.Watch()获取plegCh,然后传给syncLoopIteration()方法,syncLoopIteration()方法中消费plegCh中的数据,在handler中通过调用dispatchWork分发任务

3)syncCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	...
	case <-syncCh:
		// Sync pods waiting for sync
		podsToSync := kl.getPodsToSync()
		if len(podsToSync) == 0 {
			break
		}
		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync))
		// 同步最新保存的pod状态
		handler.HandlePodSyncs(podsToSync)
    ...
	}
	return true
}        

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,这个方法会同步所有等待同步的Pod

4)kl.livenessManager.Updates()kl.readinessManager.Updates()kl.startupManager.Updates()

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	...
	case update := <-kl.livenessManager.Updates():
		// 如果探针健康检查失败,需要更新pod的状态
		if update.Result == proberesults.Failure {
			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
		}
	case update := <-kl.readinessManager.Updates():
		// 当readiness状态变更时,更新容器和pod的状态
		ready := update.Result == proberesults.Success
		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

		status := ""
		if ready {
			status = "ready"
		}
		handleProbeSync(kl, update, handler, "readiness", status)
	case update := <-kl.startupManager.Updates():
		// 当startup状态变更时,更新容器和pod的状态
		started := update.Result == proberesults.Success
		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

		status := "unhealthy"
		if started {
			status = "started"
		}
		handleProbeSync(kl, update, handler, "startup", status)
    ...
	}
	return true
}       

6)housekeepingCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	...
	case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() {
			// If the sources aren't ready or volume manager has not yet synced the states,
			// skip housekeeping, as we may accidentally delete pods from unready sources.
			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
		} else {
			start := time.Now()
			klog.V(4).InfoS("SyncLoop (housekeeping)")
			// 执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录
			if err := handler.HandlePodCleanups(); err != nil {
				klog.ErrorS(err, "Failed cleaning pods")
			}
			duration := time.Since(start)
			if duration > housekeepingWarningDuration {
				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
			}
			klog.V(4).InfoS("SyncLoop (housekeeping) end")
		}
	}
	return true
}

housekeepingCh也是由由syncLoop()方法创建的,每两秒钟会触发清理,包括:终止Pod Workers、删除不想要的Pod,移除Volumes、Pod目录等

syncLoopIteration()方法监听如下的channel,根据事件做不同的处理

  • configCh:监听file、HTTP、APIServer的时间更新
  • plegCh:pleg子模块每秒钟调用一次relist,根据最新的PodStatus生成podLiftCycleEvent,然后存入到plegCh中
  • syncCh:定时器管道, 每隔一秒去同步最新保存的Pod状态
  • kl.livenessManager.Updates():如果探针检查失败,需要更新Pod的状态
  • kl.readinessManager.Updates():当readiness状态变更时,更新容器和Pod的状态
  • kl.startupManager.Updates():当startup状态变更时,更新容器和Pod的状态
  • housekeepingCh:每两秒钟会触发Pod清理工作
4)、小结

kubelet启动过程如下图

3、syncLoopIteration()方法中涉及的channel

1)、configCh

configCh相关的代码调用流程如上图,关于syncLoopIteration()方法中configCh的处理逻辑前面已经讲过了,这里来看下kubelet是如何监听APIServer并将Pod信息变化写入configCh的

// pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	// 初始化config.PodConfig
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
	// 添加三种数据来源,分别是file、http、apiServer
	// define file config source
	if kubeCfg.StaticPodPath != "" {
		klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
	}

	// define url config source
	if kubeCfg.StaticPodURL != "" {
		klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
	}

	if kubeDeps.KubeClient != nil {
		klog.InfoS("Adding apiserver pod source")
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
	}
	return cfg, nil
}

makePodSourceConfig()方法中先初始化config.PodConfig,然后添加三种数据来源:分别是file、http、APIServer,调用cfg.Channel()方法会创建对应的channel

1)NewSourceApiserver()

这里先来看监听APIServer的部分,NewSourceApiserver()方法代码如下:

// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
	// 创建ListWatch,监听当前node的pod变化
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))

	// The Reflector responsible for watching pods at the apiserver should be run only after
	// the node sync with the apiserver has completed.
	klog.InfoS("Waiting for node sync before watching apiserver pods")
	go func() {
		for {
			if nodeHasSynced() {
				klog.V(4).InfoS("node sync completed")
				break
			}
			time.Sleep(WaitForAPIServerSyncPeriod)
			klog.V(4).InfoS("node sync has not completed yet")
		}
		klog.InfoS("Watching apiserver")
		// 如果node sync完成,调用newSourceApiserverFromLW方法
		newSourceApiserverFromLW(lw, updates)
	}()
}

func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		// 监听到apiServer当前node的pod信息变化后写入channel,后续listen()方法会监听这个channel接收值
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	// 调用client-go API来创建reflector
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}

newSourceApiserverFromLW()方法中调用client-go API来创建Reflector,当监听到APIServer中当前Node的Pod信息变化后写入channel

2)cfg.Channel()

makePodSourceConfig()方法中调用cfg.Channel()方法会创建对应的channel

// pkg/kubelet/config/config.go
// 给每个来源注册一个专用的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {
	c.sourcesLock.Lock()
	defer c.sourcesLock.Unlock()
	c.sources.Insert(source)
	// 调用c.mux.Channel方法
	return c.mux.Channel(source)
}
// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel
	// 同时启动goroutine去监听新数据
	// 这里创建的channel最终会传入newSourceApiserverFromLW中定义的send函数中,当监听到apiServer当前node的pod数据变化后会写入channel
	// listen函数会一直监听这个channel来接收数据
	go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
	return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
	for update := range listenChannel {
		// 调用Merge方法
		m.merger.Merge(source, update)
	}
}

Channel()方法中创建的channel最终会传入newSourceApiserverFromLW()方法中定义的send()函数中,当监听到APIServer当前Node的Pod信息数据变化后会写入channel,这里的listen()方法会一直监听这个channel来接收数据,listen()方法调用Merge()方法处理接收到的数据

// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	// 区分pod变更类型
	adds, updates, deletes, removes, reconciles := s.merge(source, change)
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	case PodConfigNotificationIncremental:
		// 最终将pod变更信息传入configCh
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}
		if len(adds.Pods) > 0 {
			s.updates <- *adds
		}
		if len(updates.Pods) > 0 {
			s.updates <- *updates
		}
		if len(deletes.Pods) > 0 {
			s.updates <- *deletes
		}
		if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
			// Send an empty update when first seeing the source and there are
			// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
			// the source is ready.
			s.updates <- *adds
		}
		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
		if len(reconciles.Pods) > 0 {
			s.updates <- *reconciles
		}

	...
	}

	return nil
}

Merge()方法中会区分Pod变更类型,最终将Pod变更信息传入configCh,kl.syncLoopIteration()方法中监听configCh,交给handler去处理,在handler中通过调用dispatchWork分发任务

configCh数据写入流程如下图:

在这里插入图片描述

2)、plegCh

初始化pleg并运行代码如下:

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) Start() {
	go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

Start()方法中启动一个gorounite函数每一秒执行一次relist()方法

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) relist() {
	klog.V(5).InfoS("GenericPLEG: Relisting")

	if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
		metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
	}

	timestamp := g.clock.Now()
	defer func() {
		metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
	}()

	// Get all the pods.
	// 调用runtime获取当前node的所有pod和container信息(最终调用CRI接口)
	podList, err := g.runtime.GetPods(true)
	if err != nil {
		klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
		return
	}

	g.updateRelistTime(timestamp)

	pods := kubecontainer.Pods(podList)
	// update running pod and container count
	updateRunningPodAndContainerMetrics(pods)
	g.podRecords.setCurrent(pods)

	// Compare the old and the current pods, and generate events.
	eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
	for pid := range g.podRecords {
		oldPod := g.podRecords.getOld(pid)
		pod := g.podRecords.getCurrent(pid)
		// Get all containers in the old and the new pod.
		// 获得pod中的所有container
		allContainers := getContainersFromPods(oldPod, pod)
		for _, container := range allContainers {
			// 检查container是否有变化,如果有变化,生成podLifecycleEvent
			events := computeEvents(oldPod, pod, &container.ID)
			for _, e := range events {
				updateEvents(eventsByPodID, e)
			}
		}
	}

	var needsReinspection map[types.UID]*kubecontainer.Pod
	if g.cacheEnabled() {
		needsReinspection = make(map[types.UID]*kubecontainer.Pod)
	}

	// If there are events associated with a pod, we should update the
	// podCache.
	// 遍历所有发生的event的pod
	for pid, events := range eventsByPodID {
		pod := g.podRecords.getCurrent(pid)
		if g.cacheEnabled() {
			// updateCache() will inspect the pod and update the cache. If an
			// error occurs during the inspection, we want PLEG to retry again
			// in the next relist. To achieve this, we do not update the
			// associated podRecord of the pod, so that the change will be
			// detect again in the next relist.
			// TODO: If many pods changed during the same relist period,
			// inspecting the pod and getting the PodStatus to update the cache
			// serially may take a while. We should be aware of this and
			// parallelize if needed.
			if err := g.updateCache(pod, pid); err != nil {
				// Rely on updateCache calling GetPodStatus to log the actual error.
				klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))

				// make sure we try to reinspect the pod during the next relisting
				needsReinspection[pid] = pod

				continue
			} else {
				// this pod was in the list to reinspect and we did so because it had events, so remove it
				// from the list (we don't want the reinspection code below to inspect it a second time in
				// this relist execution)
				delete(g.podsToReinspect, pid)
			}
		}
		// Update the internal storage and send out the events.
		g.podRecords.update(pid)

		// Map from containerId to exit code; used as a temporary cache for lookup
		containerExitCode := make(map[string]int)
		// 遍历这个pod的所有event变化
		for i := range events {
			// Filter out events that are not reliable and no other components use yet.
			if events[i].Type == ContainerChanged {
				continue
			}
			select {
			// 推送到kubelet的plegCh中
			case g.eventChannel <- events[i]:
			default:
				metrics.PLEGDiscardEvents.Inc()
				klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
			}
			// Log exit code of containers when they finished in a particular event
			if events[i].Type == ContainerDied {
				// Fill up containerExitCode map for ContainerDied event when first time appeared
				if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
					// Get updated podStatus
					status, err := g.cache.Get(pod.ID)
					if err == nil {
						for _, containerStatus := range status.ContainerStatuses {
							containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
						}
					}
				}
				if containerID, ok := events[i].Data.(string); ok {
					if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
						klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
					}
				}
			}
		}
	}

	if g.cacheEnabled() {
		// reinspect any pods that failed inspection during the previous relist
		if len(g.podsToReinspect) > 0 {
			klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
			for pid, pod := range g.podsToReinspect {
				if err := g.updateCache(pod, pid); err != nil {
					// Rely on updateCache calling GetPodStatus to log the actual error.
					klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
					needsReinspection[pid] = pod
				}
			}
		}

		// Update the cache timestamp.  This needs to happen *after*
		// all pods have been properly updated in the cache.
		g.cache.UpdateTime(timestamp)
	}

	// make sure we retain the list of pods that need reinspecting the next time relist is called
	g.podsToReinspect = needsReinspection
}

relist()方法中主要逻辑如下:

  1. 调用runtime获取当前Node的所有Pod和Container信息(最终调用CRI接口)
  2. 遍历所有Pod,检查container是否有变化,如果有变化,生成podLifecycleEvent
  3. 遍历所有发生的event的Pod,遍历Pod的所有event变化,推送到kubelet的plegChg中
3)、syncCh

所有的Pod进入syncLoopIteration()方法后,最终会走到managePodLoop()方法中,会将Pod信息添加到workQueue队列里

// pkg/kubelet/pleg/generic.go
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
	// Requeue the last update if the last sync returned error.
	switch {
	case syncErr == nil:
		// No error; requeue at the regular resync interval.
		p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
	case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
		// Network is not ready; back off for short period of time and retry as network might be ready soon.
		p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
	default:
		// Error occurred during the sync; back off and then retry.
		p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
	}
	p.completeWorkQueueNext(pod.UID)
}

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会调用getPodsToSync()方法从workQueue中获取等待同步的Pod进行同步

4)、小结

kubelet核心流程如下图

在这里插入图片描述

参考:

11.深入k8s:kubelet工作原理及其初始化源码分析

45 | 幕后英雄:SIG-Node与CRI

46 | 解读 CRI 与 容器运行时

kubelet启动&&创建Pod源码分析

kubelet源码分析 syncLoopIteration(一) configCh

kubelet源码分析 syncLoopIteration(二) plegCh、syncCh、relist

kubelet源码分析 housekeeping 定时清理

文章来源:https://blog.csdn.net/qq_40378034/article/details/135181381
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。