kubelet源码分析(二):kubelet启动流程解析

2023-12-19 05:25:35

源码环境

kubernetes版本:1.23.0

在这里插入图片描述
本文的源码分析调用链路图如上图所示,话不多说,我们开始逐步分析kebelet的启动流程。

源码分析

首先找到kubelet的入口函数,一般入口函数的位置是在/cmd文件夹下。这里是main函数实现入口,主要是封装了cobra命令行基础类型并启动。Cobra 是一个用于创建命令行应用程序的 Go 语言开源框架。它提供了一套简单而强大的工具,用于定义命令、子命令、标志和参数,以及处理用户输入和执行相应的操作。

Cobra:一种基于golang的命令行开发框架(一)
期望深入了解的读者请参考这篇文章

\k8s.io\kubernetes\cmd\kubelet\kubelet.go
func main() {
 // 组装cobra的命令行实现逻辑,这是是kubelet核心逻辑入口,这里的command的实现类是*cobra.Command,也就是cobra框架的基础类型
   command := app.NewKubeletCommand()
   // kubelet uses a config file and does its own special
   // parsing of flags and that config file. It initializes
   // logging after it is done with that. Therefore it does
   // not use cli.Run like other, simpler commands.
   // 继续追踪run()可以看到这里实现的是cobra框架的启动,可以追踪到该函数中有command.Execute()代码逻辑,该函数实现的是cobra命令行工具启动逻辑
   code := run(command)
   os.Exit(code)
}

继续追踪app.NewKubeletCommand()函数.这个函数主要分为两个部分:

  • 第一部分是通过默认值或者指定的用户配置文件来构建kubelet的启动命令和依赖的配置信息。
  • 第二部分是更多的是构建cobra的Command结构体,这个结构体封装了kubelet的具体执行逻辑。
func NewKubeletCommand() *cobra.Command {
   cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
   cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
 // 1、kubelet配置分两部分:
// KubeletFlag: kubelet的启动参数
// KubeletConfiguration: 指可以在集群中各个Nodes之间共享的配置集,可以进行动态配置,一般解析的路径是:/var/lib/kubelet/config.yaml
   kubeletFlags := options.NewKubeletFlags()
   kubeletConfig, err := options.NewKubeletConfiguration()
   // programmer error
   if err != nil {
      klog.ErrorS(err, "Failed to create a new kubelet configuration")
      os.Exit(1)
   }
   // 2.构建cobra的命令结构体
   cmd := &cobra.Command{   
     .....
     // 这里注意一下,这个参数是不走cobra的flag解析逻辑,所有的flag均有kubelet程
     //序自主解析
      DisableFlagParsing: true,
      Run: func(cmd *cobra.Command, args []string) {
         .....
         //1.处理flag和args的逻辑省略
         // 这里就是上述flag和kubeletConfig的解析结果保存在kubeletServer中
         kubeletServer := &options.KubeletServer{
            KubeletFlags:         *kubeletFlags,
            KubeletConfiguration: *kubeletConfig,
         }
         // 2.KubeletDeps是kubelet启动所依赖的所有资源,包含集群的连接客户
         //端.....
         kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
         if err != nil {
            klog.ErrorS(err, "Failed to construct kubelet dependencies")
            os.Exit(1)
         }
         // 动态kubelet config才依赖这个配置,这里看起来是由一个controller来纳
         //管kubelet的config
         kubeletDeps.KubeletConfigController = kubeletConfigController

         if err := checkPermissions(); err != nil {
            klog.ErrorS(err, "kubelet running with insufficient permissions")
         }
 
         ctx := genericapiserver.SetupSignalContext()
         config := kubeletServer.KubeletConfiguration.DeepCopy()
         for k := range config.StaticPodURLHeader {
            config.StaticPodURLHeader[k] = []string{"<masked>"}
         }
         // log the kubelet's config for inspection
         klog.V(5).InfoS("KubeletConfiguration", "configuration", config)

         // run the kubelet
         // 3.这里启动kubelet
         if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
            klog.ErrorS(err, "Failed to run kubelet")
            os.Exit(1)
         }
      },
   }
.....
   return cmd
}

额外看一下UnsecuredDependencies这个函数,我想看一下这个kubelet启动依赖的资源有哪些,在这个函数中初始化了docker的相关配置,并且配置了一个ExperimentalMounterPath,这个ExperimentalMounterPath 是一个与 Kubernetes 相关的实验性功能,用于指定容器运行时(如 Docker 或 containerd)在节点上挂载容器文件系统的路径。

func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
   // Initialize the TLS Options
   // 初始化一写些tls的选项TLSCertFile and TLSPrivateKeyFile,tls协议相关的
   tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
   if err != nil {
      return nil, err
   }

   mounter := mount.New(s.ExperimentalMounterPath)
   subpather := subpath.New(mounter)
   hu := hostutil.NewHostUtil()
   var pluginRunner = exec.New()

   var dockerOptions *kubelet.DockerOptions
   // 如果配置的容器运行时是docker,在这里添加默认的docker options
   if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
      dockerOptions = &kubelet.DockerOptions{
         DockerEndpoint:            s.DockerEndpoint,
         RuntimeRequestTimeout:     s.RuntimeRequestTimeout.Duration,
         ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
      }
   }

   plugins, err := ProbeVolumePlugins(featureGate)
   if err != nil {
      return nil, err
   }
   // 这里仅仅初始化了一个空的dependence,应该是在后续流程里初始化其余字段
   return &kubelet.Dependencies{
      Auth:                nil, // default does not enforce auth[nz]
      CAdvisorInterface:   nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
      Cloud:               nil, // cloud provider might start background processes
      ContainerManager:    nil,
      DockerOptions:       dockerOptions,
      KubeClient:          nil,
      HeartbeatClient:     nil,
      EventClient:         nil,
      HostUtil:            hu,
      Mounter:             mounter,
      Subpather:           subpather,
      OOMAdjuster:         oom.NewOOMAdjuster(),
      OSInterface:         kubecontainer.RealOS{},
      VolumePlugins:       plugins,
      DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
      TLSOptions:          tlsOptions}, nil
}

继续追踪上文的NewKubeletCommand中的Run函数,该函数的逻辑是实现落在了\k8s.io\kubernetes\cmd\kubelet\app\server.go代码中。该函数主要做了这么几件事情:
1、为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启。这里主要是解析配置文件,通过配置文件中的配置项决定是不是开启或者关闭某些功能。
2、校验 kubelet 的参数;
3、尝试获取 kubelet 的 lock file,可以在 kubelet 启动时指定,在启动时,kubelet 会尝试创建或获取锁文件,确保自己是当前节点上唯一管理容器生命周期的 kubelet 实例。如果另一个 kubelet 进程已经存在并持有该锁文件,则新启动的 kubelet 实例通常会结束启动过程,因为它无法获得对节点资源的独占访问权。总结来说,kubelet 的锁文件(s.LockFilePath 所指的锁文件)用于保障节点上只运行一个 kubelet 实例,以防止潜在的资源冲突
4、将当前的配置文件注册到 http server /configz URL 中;
5、检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
6、初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClient、EventClient、HeartbeatClient、Auth、cadvisor、ContainerManager;
7、检查是否以 root 用户启动;
8、为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
9、调用 RunKubelet 方法;
10、检查 kubelet 是否启动了动态配置功能;
11、启动 Healthz http server;
12、如果使用 systemd 启动,通知 systemd kubelet 已经启动;

server.go
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
   // Set global feature gates based on the value on the initial KubeletServer
   // 从配置项中解析kubelet的各个组件的开关配置信息,忽略
 ......

   // Register current configuration with /configz endpoint
   // 这个我没验证成功,似乎1.18版本后的这个configz端点不存在了,/configz是kubelet的静态api提供的kubulet配置信息的查询接口
   err = initConfigz(&s.KubeletConfiguration)
   if err != nil {
      klog.ErrorS(err, "Failed to register kubelet configuration with configz")
   }

   if len(s.ShowHiddenMetricsForVersion) > 0 {
      metrics.SetShowHidden()
   }

   // About to get clients and such, detect standaloneMode
   // kubelet 的 standaloneMode 是一个配置选项,用于将 kubelet 配置为独立模式。在独立模式下,kubelet 不会连接到 Kubernetes 控制平面,而是以独立的方式运行,并根据本地配置文件和参数来管理容器。
   // 使用 kubelet 的 standaloneMode 可以将节点配置为脱离 Kubernetes 集群的独立容器运行环境。这在一些特定场景下可能会有用,例如在测试环境中快速部署容器应用,或者在不需要完整 Kubernetes 功能的情况下运行单个容器
   standaloneMode := true
   if len(s.KubeConfig) > 0 {
      standaloneMode = false
   }

   if kubeDeps == nil {
      kubeDeps, err = UnsecuredDependencies(s, featureGate)
      if err != nil {
         return err
      }
   }
    //主要用于实例化云厂商提供的接口服务
   if kubeDeps.Cloud == nil {
      if !cloudprovider.IsExternal(s.CloudProvider) {
         cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
         cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
         if err != nil {
            return err
         }
         if cloud != nil {
            klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
         }
         kubeDeps.Cloud = cloud
      }
   }

// 这里就是之前提到的覆盖主机名的实现入口,可以通过HostnameOverride来覆盖实际的主机名
   hostName, err := nodeutil.GetHostname(s.HostnameOverride)
   if err != nil {
      return err
   }
   nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
   if err != nil {
      return err
   }

   // if in standalone mode, indicate as much by setting all clients to nil
   // kubelet的standloneMode
   switch {
   case standaloneMode:
      //手动清空client
      kubeDeps.KubeClient = nil
      kubeDeps.EventClient = nil
      kubeDeps.HeartbeatClient = nil
      klog.InfoS("Standalone mode, no API client")

   case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
      // 初始化集群的客户端连接
       clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName)
      if err != nil {
         return err
      }
      if onHeartbeatFailure == nil {
         return errors.New("onHeartbeatFailure must be a valid function other than nil")
      }
      kubeDeps.OnHeartbeatFailure = onHeartbeatFailure

      kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
      if err != nil {
         return fmt.Errorf("failed to initialize kubelet client: %w", err)
      }

      // make a separate client for events
      eventClientConfig := *clientConfig
      eventClientConfig.QPS = float32(s.EventRecordQPS)
      eventClientConfig.Burst = int(s.EventBurst)
      kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
      if err != nil {
         return fmt.Errorf("failed to initialize kubelet event client: %w", err)
      }

      // make a separate client for heartbeat with throttling disabled and a timeout attached
      heartbeatClientConfig := *clientConfig
      heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
      // The timeout is the minimum of the lease duration and status update frequency
      leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
      if heartbeatClientConfig.Timeout > leaseTimeout {
         heartbeatClientConfig.Timeout = leaseTimeout
      }

      heartbeatClientConfig.QPS = float32(-1)
      kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
      if err != nil {
         return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
      }
   }

   if kubeDeps.Auth == nil {
      auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
      if err != nil {
         return err
      }
      kubeDeps.Auth = auth
      runAuthenticatorCAReload(ctx.Done())
   }

   var cgroupRoots []string
   // 这里return的是""
   nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
   cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
    // 这里return的是""
   kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
   if err != nil {
      klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
   } else if kubeletCgroup != "" {
      cgroupRoots = append(cgroupRoots, kubeletCgroup)
   }

 // 这里return的是""
   runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
   if err != nil {
      klog.InfoS("Failed to get the container runtime's cgroup. Runtime system container metrics may be missing.", "err", err)
   } else if runtimeCgroup != "" {
      // RuntimeCgroups is optional, so ignore if it isn't specified
      cgroupRoots = append(cgroupRoots, runtimeCgroup)
   }

   if s.SystemCgroups != "" {
      // SystemCgroups is optional, so ignore if it isn't specified
      cgroupRoots = append(cgroupRoots, s.SystemCgroups)
   }

   if kubeDeps.CAdvisorInterface == nil {
       //cadvisor初始化
      imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
      kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
      if err != nil {
         return err
      }
   }

   // Setup event recorder if required.
   // 初始化事件发送器
   makeEventRecorder(kubeDeps, nodeName)

   if kubeDeps.ContainerManager == nil {
      if s.CgroupsPerQOS && s.CgroupRoot == "" {
         klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
         s.CgroupRoot = "/"
      }

      machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
      if err != nil {
         return err
      }
      reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
      if err != nil {
         return err
      }
      if reservedSystemCPUs.Size() > 0 {
         // at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
         klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
         if s.KubeReserved != nil {
            delete(s.KubeReserved, "cpu")
         }
         if s.SystemReserved == nil {
            s.SystemReserved = make(map[string]string)
         }
         s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
         klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
      }

      kubeReserved, err := parseResourceList(s.KubeReserved)
      if err != nil {
         return err
      }
      systemReserved, err := parseResourceList(s.SystemReserved)
      if err != nil {
         return err
      }
      var hardEvictionThresholds []evictionapi.Threshold
      // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
      if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
         hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
         if err != nil {
            return err
         }
      }
      experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
      if err != nil {
         return err
      }

      devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

      var cpuManagerPolicyOptions map[string]string
      if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
         if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
            cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
         } else if s.CPUManagerPolicyOptions != nil {
            return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
               s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
         }
      }

     //初始化containManage
      kubeDeps.ContainerManager, err = cm.NewContainerManager(
         kubeDeps.Mounter,
         kubeDeps.CAdvisorInterface,
         cm.NodeConfig{
            RuntimeCgroupsName:    s.RuntimeCgroups,
            SystemCgroupsName:     s.SystemCgroups,
            KubeletCgroupsName:    s.KubeletCgroups,
            ContainerRuntime:      s.ContainerRuntime,
            CgroupsPerQOS:         s.CgroupsPerQOS,
            CgroupRoot:            s.CgroupRoot,
            CgroupDriver:          s.CgroupDriver,
            KubeletRootDir:        s.RootDirectory,
            ProtectKernelDefaults: s.ProtectKernelDefaults,
            NodeAllocatableConfig: cm.NodeAllocatableConfig{
               KubeReservedCgroupName:   s.KubeReservedCgroup,
               SystemReservedCgroupName: s.SystemReservedCgroup,
               EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
               KubeReserved:             kubeReserved,
               SystemReserved:           systemReserved,
               ReservedSystemCPUs:       reservedSystemCPUs,
               HardEvictionThresholds:   hardEvictionThresholds,
            },
            QOSReserved:                             *experimentalQOSReserved,
            ExperimentalCPUManagerPolicy:            s.CPUManagerPolicy,
            ExperimentalCPUManagerPolicyOptions:     cpuManagerPolicyOptions,
            ExperimentalCPUManagerReconcilePeriod:   s.CPUManagerReconcilePeriod.Duration,
            ExperimentalMemoryManagerPolicy:         s.MemoryManagerPolicy,
            ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
            ExperimentalPodPidsLimit:                s.PodPidsLimit,
            EnforceCPULimits:                        s.CPUCFSQuota,
            CPUCFSQuotaPeriod:                       s.CPUCFSQuotaPeriod.Duration,
            ExperimentalTopologyManagerPolicy:       s.TopologyManagerPolicy,
            ExperimentalTopologyManagerScope:        s.TopologyManagerScope,
         },
         s.FailSwapOn,
         devicePluginEnabled,
         kubeDeps.Recorder)

      if err != nil {
         return err
      }
   }

   utilruntime.ReallyCrash = s.ReallyCrashForTesting

   // TODO(vmarmol): Do this through container config.
   oomAdjuster := kubeDeps.OOMAdjuster
   if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
      klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
   }
   // 在kubelet启动之前,初始化一些依赖服务,这里启动了docker shim的service
   err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
      kubeDeps, &s.ContainerRuntimeOptions,
      s.ContainerRuntime,
      s.RuntimeCgroups,
      s.RemoteRuntimeEndpoint,
      s.RemoteImageEndpoint,
      s.NonMasqueradeCIDR)
   if err != nil {
      return err
   }

   // 看起来kubelet的核心启动逻辑是在这里
   if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
      return err
   }

   // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
   if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
      kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
      if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
         return err
      }
   }

   if s.HealthzPort > 0 {
       //启动kubelet的健康检查web服务
      mux := http.NewServeMux()
      healthz.InstallHandler(mux)
      go wait.Until(func() {
         err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
         if err != nil {
            klog.ErrorS(err, "Failed to start healthz server")
         }
      }, 5*time.Second, wait.NeverStop)
   }

   if s.RunOnce {
      return nil
   }

   // If systemd is used, notify it that we have started
   go daemon.SdNotify(false, "READY=1")

   select {
   case <-done:
      break
   case <-ctx.Done():
      break
   }

   return nil
}

继续追踪RunKubelet的函数逻辑。该函数主要做了两件事:

  • 初始化kubelet
  • 启动kubelet
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
//
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  .......
   // 1、调用 createAndInitKubelet,创建并且实例化一个kubelet结构
   k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
      kubeDeps,
      &kubeServer.ContainerRuntimeOptions,
      kubeServer.ContainerRuntime,
      hostname,
      hostnameOverridden,
      nodeName,
      nodeIPs,
      kubeServer.ProviderID,
      kubeServer.CloudProvider,
      kubeServer.CertDirectory,
      kubeServer.RootDirectory,
      kubeServer.ImageCredentialProviderConfigFile,
      kubeServer.ImageCredentialProviderBinDir,
      kubeServer.RegisterNode,
      kubeServer.RegisterWithTaints,
      kubeServer.AllowedUnsafeSysctls,
      kubeServer.ExperimentalMounterPath,
      kubeServer.KernelMemcgNotification,
      kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
      kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
      kubeServer.MinimumGCAge,
      kubeServer.MaxPerPodContainerCount,
      kubeServer.MaxContainerCount,
      kubeServer.MasterServiceNamespace,
      kubeServer.RegisterSchedulable,
      kubeServer.KeepTerminatedPodVolumes,
      kubeServer.NodeLabels,
      kubeServer.NodeStatusMaxImages,
      kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
   )
  .......
   if runOnce {
      if _, err := k.RunOnce(podCfg.Updates()); err != nil {
         return fmt.Errorf("runonce failed: %w", err)
      }
      klog.InfoS("Started kubelet as runonce")
   } else {
      // 2.启动kubelet
      startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
      klog.InfoS("Started kubelet")
   }
   return nil
}

追踪到此处,kubelet初始化和启动的所有执行步骤已经全部准备完毕,我们似乎已经追踪到kubelet的核心初始化逻辑和启动逻辑。
该步骤有两个核心函数,分别是createAndInitKubeletstartKubelet,从名称可以看出这两个函数应该是一个初始化kubelet,一个是启动kubelet。

首先,将视角转移到createAndInitKubelet函数。该函数做了三件事,第一件事是初始化一个kubelet对象,第二件事是把kubelet初始化事件发送到apiserver,第三件事是启动垃圾回收服务,回收 container 和 images。

func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
   kubeDeps *kubelet.Dependencies,
   crOptions *config.ContainerRuntimeOptions,
   containerRuntime string,
   hostname string,
   hostnameOverridden bool,
   nodeName types.NodeName,
   nodeIPs []net.IP,
   providerID string,
   cloudProvider string,
   certDirectory string,
   rootDirectory string,
   imageCredentialProviderConfigFile string,
   imageCredentialProviderBinDir string,
   registerNode bool,
   registerWithTaints []v1.Taint,
   allowedUnsafeSysctls []string,
   experimentalMounterPath string,
   kernelMemcgNotification bool,
   experimentalCheckNodeCapabilitiesBeforeMount bool,
   experimentalNodeAllocatableIgnoreEvictionThreshold bool,
   minimumGCAge metav1.Duration,
   maxPerPodContainerCount int32,
   maxContainerCount int32,
   masterServiceNamespace string,
   registerSchedulable bool,
   keepTerminatedPodVolumes bool,
   nodeLabels map[string]string,
   nodeStatusMaxImages int32,
   seccompDefault bool,
) (k kubelet.Bootstrap, err error) {
   // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
   // up into "per source" synchronizations

   // 1.实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;
   k, err = kubelet.NewMainKubelet(kubeCfg,
      kubeDeps,
      crOptions,
      containerRuntime,
      hostname,
      hostnameOverridden,
      nodeName,
      nodeIPs,
      providerID,
      cloudProvider,
      certDirectory,
      rootDirectory,
      imageCredentialProviderConfigFile,
      imageCredentialProviderBinDir,
      registerNode,
      registerWithTaints,
      allowedUnsafeSysctls,
      experimentalMounterPath,
      kernelMemcgNotification,
      experimentalCheckNodeCapabilitiesBeforeMount,
      experimentalNodeAllocatableIgnoreEvictionThreshold,
      minimumGCAge,
      maxPerPodContainerCount,
      maxContainerCount,
      masterServiceNamespace,
      registerSchedulable,
      keepTerminatedPodVolumes,
      nodeLabels,
      nodeStatusMaxImages,
      seccompDefault,
   )
   if err != nil {
      return nil, err
   }

   //2.向apiserver发送一条kubelet启动了的event事件
   k.BirthCry()

   //3.启动垃圾回收服务,回收 container 和 images;
   k.StartGarbageCollection()

   return k, nil
}

NewMainKubelet实例化一个新的 Kubelet 对象以及所有必需的内部模块。
1、初始化 PodConfig 即监听 pod 元数据的来源(file,http,apiserver),将不同 source 的 pod configuration 合并到一个结构中;
2、初始化 containerGCPolicy、imageGCPolicy、evictionConfig 配置;
3、启动 serviceInformer 和 nodeInformer;
4、初始化 containerRefManager、oomWatcher;
5、初始化 kubelet 对象;
6、初始化 secretManager、configMapManager;
7、初始化 livenessManager、podManager、statusManager、resourceAnalyzer;
8、调用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime;
9、初始化 pleg;
10、初始化 containerGC、containerDeletor、imageManager、containerLogManager;
11、初始化 serverCertificateManager、probeManager、tokenManager、volumePluginMgr、pluginManager、volumeManager;
12、初始化 workQueue、podWorkers、evictionManager;
13、最后注册相关模块的 handler;

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
   kubeDeps *Dependencies,
   crOptions *config.ContainerRuntimeOptions,
   containerRuntime string,
   hostname string,
   hostnameOverridden bool,
   nodeName types.NodeName,
   nodeIPs []net.IP,
   providerID string,
   cloudProvider string,
   certDirectory string,
   rootDirectory string,
   imageCredentialProviderConfigFile string,
   imageCredentialProviderBinDir string,
   registerNode bool,
   registerWithTaints []v1.Taint,
   allowedUnsafeSysctls []string,
   experimentalMounterPath string,
   kernelMemcgNotification bool,
   experimentalCheckNodeCapabilitiesBeforeMount bool,
   experimentalNodeAllocatableIgnoreEvictionThreshold bool,
   minimumGCAge metav1.Duration,
   maxPerPodContainerCount int32,
   maxContainerCount int32,
   masterServiceNamespace string,
   registerSchedulable bool,
   keepTerminatedPodVolumes bool,
   nodeLabels map[string]string,
   nodeStatusMaxImages int32,
   seccompDefault bool,
) (*Kubelet, error) {
   ......
   var nodeHasSynced cache.InformerSynced
   var nodeLister corelisters.NodeLister

   // If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
   // If not nil, we are running as part of a cluster and should sync w/API
   // 1.初始化informer
   if kubeDeps.KubeClient != nil {
      kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
         options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
      }))
      nodeLister = kubeInformers.Core().V1().Nodes().Lister()
      nodeHasSynced = func() bool {
         return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
      }
      kubeInformers.Start(wait.NeverStop)
      klog.InfoS("Attempting to sync node with API server")
   } else {
      // we don't have a client to sync!
      nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
      nodeLister = corelisters.NewNodeLister(nodeIndexer)
      nodeHasSynced = func() bool { return true }
      klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
   }
   // kubeDeps的PodConfig
   if kubeDeps.PodConfig == nil {
      var err error
      // 2.初始化 PodConfig 即监听 pod 元数据的来源(file,http,apiserver),将不同 source 的 pod configuration 合并到一个结构中;
      kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
      if err != nil {
         return nil, err
      }
   }

   containerGCPolicy := kubecontainer.GCPolicy{
      MinAge:             minimumGCAge.Duration,
      MaxPerPodContainer: int(maxPerPodContainerCount),
      MaxContainers:      int(maxContainerCount),
   }

   daemonEndpoints := &v1.NodeDaemonEndpoints{
      KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
   }

   imageGCPolicy := images.ImageGCPolicy{
      MinAge:               kubeCfg.ImageMinimumGCAge.Duration,
      HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
      LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),
   }

   enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
   if experimentalNodeAllocatableIgnoreEvictionThreshold {
      // Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
      enforceNodeAllocatable = []string{}
   }
   thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
   if err != nil {
      return nil, err
   }
   evictionConfig := eviction.Config{
      PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
      MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
      Thresholds:               thresholds,
      KernelMemcgNotification:  kernelMemcgNotification,
      PodCgroupRoot:            kubeDeps.ContainerManager.GetPodCgroupRoot(),
   }

   // informer启动serviceInformer和nodeInformer
   .....   

   klet := &Kubelet{
      hostname:                                hostname,
      hostnameOverridden:                      hostnameOverridden,
      nodeName:                                nodeName,
      kubeClient:                              kubeDeps.KubeClient,
      heartbeatClient:                         kubeDeps.HeartbeatClient,
      onRepeatedHeartbeatFailure:              kubeDeps.OnHeartbeatFailure,
      rootDirectory:                           rootDirectory,
      resyncInterval:                          kubeCfg.SyncFrequency.Duration,
      sourcesReady:                            config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
      registerNode:                            registerNode,
      registerWithTaints:                      registerWithTaints,
      registerSchedulable:                     registerSchedulable,
      dnsConfigurer:                           dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
      serviceLister:                           serviceLister,
      serviceHasSynced:                        serviceHasSynced,
      nodeLister:                              nodeLister,
      nodeHasSynced:                           nodeHasSynced,
      masterServiceNamespace:                  masterServiceNamespace,
      streamingConnectionIdleTimeout:          kubeCfg.StreamingConnectionIdleTimeout.Duration,
      recorder:                                kubeDeps.Recorder,
      cadvisor:                                kubeDeps.CAdvisorInterface,
      cloud:                                   kubeDeps.Cloud,
      externalCloudProvider:                   cloudprovider.IsExternal(cloudProvider),
      providerID:                              providerID,
      nodeRef:                                 nodeRef,
      nodeLabels:                              nodeLabels,
      nodeStatusUpdateFrequency:               kubeCfg.NodeStatusUpdateFrequency.Duration,
      nodeStatusReportFrequency:               kubeCfg.NodeStatusReportFrequency.Duration,
      os:                                      kubeDeps.OSInterface,
      oomWatcher:                              oomWatcher,
      cgroupsPerQOS:                           kubeCfg.CgroupsPerQOS,
      cgroupRoot:                              kubeCfg.CgroupRoot,
      mounter:                                 kubeDeps.Mounter,
      hostutil:                                kubeDeps.HostUtil,
      subpather:                               kubeDeps.Subpather,
      maxPods:                                 int(kubeCfg.MaxPods),
      podsPerCore:                             int(kubeCfg.PodsPerCore),
      syncLoopMonitor:                         atomic.Value{},
      daemonEndpoints:                         daemonEndpoints,
      containerManager:                        kubeDeps.ContainerManager,
      containerRuntimeName:                    containerRuntime,
      nodeIPs:                                 nodeIPs,
      nodeIPValidator:                         validateNodeIP,
      clock:                                   clock.RealClock{},
      enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,
      makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,
      iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),
      iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),
      experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
      keepTerminatedPodVolumes:                keepTerminatedPodVolumes,
      nodeStatusMaxImages:                     nodeStatusMaxImages,
      lastContainerStartedTime:                newTimeCache(),
   }
   // 初始化各种manager组件
   .....
   //初始化kubelet的Pod worker
   klet.podWorkers = newPodWorkers(
      klet.syncPod,
      klet.syncTerminatingPod,
      klet.syncTerminatedPod,

      kubeDeps.Recorder,
      klet.workQueue,
      klet.resyncInterval,
      backOffPeriod,
      klet.podCache,
   )

   // 初始化容器运行时接口
   runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
      kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
      klet.livenessManager,
      klet.readinessManager,
      klet.startupManager,
      rootDirectory,
      machineInfo,
      klet.podWorkers,
      kubeDeps.OSInterface,
      klet,
      httpClient,
      imageBackOff,
      kubeCfg.SerializeImagePulls,
      float32(kubeCfg.RegistryPullQPS),
      int(kubeCfg.RegistryBurst),
      imageCredentialProviderConfigFile,
      imageCredentialProviderBinDir,
      kubeCfg.CPUCFSQuota,
      kubeCfg.CPUCFSQuotaPeriod,
      kubeDeps.RemoteRuntimeService,
      kubeDeps.RemoteImageService,
      kubeDeps.ContainerManager.InternalContainerLifecycle(),
      kubeDeps.dockerLegacyService,
      klet.containerLogManager,
      klet.runtimeClassManager,
      seccompDefault,
      kubeCfg.MemorySwap.SwapBehavior,
      kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
      *kubeCfg.MemoryThrottlingFactor,
   )
   if err != nil {
      return nil, err
   }
   klet.containerRuntime = runtime
   klet.streamingRuntime = runtime
   klet.runner = runtime

   runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
   if err != nil {
      return nil, err
   }
   klet.runtimeCache = runtimeCache

   // common provider to get host file system usage associated with a pod managed by kubelet
   hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) (string, bool) {
      return getEtcHostsPath(klet.getPodDir(podUID)), klet.containerRuntime.SupportsSingleFileMapping()
   })
   if kubeDeps.useLegacyCadvisorStats {
      klet.StatsProvider = stats.NewCadvisorStatsProvider(
         klet.cadvisor,
         klet.resourceAnalyzer,
         klet.podManager,
         klet.runtimeCache,
         klet.containerRuntime,
         klet.statusManager,
         hostStatsProvider)
   } else {
      klet.StatsProvider = stats.NewCRIStatsProvider(
         klet.cadvisor,
         klet.resourceAnalyzer,
         klet.podManager,
         klet.runtimeCache,
         kubeDeps.RemoteRuntimeService,
         kubeDeps.RemoteImageService,
         hostStatsProvider,
         utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics),
         utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
   }
   //9、初始化 pleg,pleg
   klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
   klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
   klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
   if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
      klog.ErrorS(err, "Pod CIDR update failed")
   }

   // setup containerGC
   containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
   if err != nil {
      return nil, err
   }
   klet.containerGC = containerGC
   klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

   // setup imageManager
   imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
   if err != nil {
      return nil, fmt.Errorf("failed to initialize image manager: %v", err)
   }
   klet.imageManager = imageManager
   // 初始化各种plugin和manager
....

   return klet, nil
}

上文中的一个字段值得我们关注,还记得我们在kubelet源码分析(一)中提到的,kubelet接收三种形式的PodSpec输入来进行Pod的各种操作,一个是file形式、一种是apiserver、一种是来自云端厂商提供的服务service。我们将目光聚焦于kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced),来看看kubeDeps.PodConfig究竟怎么获取Pod的描述信息,解析Pod的描述信息,并且以事件的方式交给kubelet的某一个worker去逐渐处理的。
PodConfig 是一个配置多路复用器,它将许多 Pod 配置源合并成一个单一的一致结构,然后按顺序向监听器传递增量变更通知。入口函数的位置在pkg/kubelet/server.go,kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)。代码详情如下:

func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
   manifestURLHeader := make(http.Header)
   if len(kubeCfg.StaticPodURLHeader) > 0 {
      for k, v := range kubeCfg.StaticPodURLHeader {
         for i := range v {
            manifestURLHeader.Add(k, v[i])
         }
      }
   }

   // 1.初始化PodConfg结构体,
   cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

   // define file config source
   if kubeCfg.StaticPodPath != "" {
      klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
      // 2.注册file数据来源
      config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
   }

   // define url config source
   if kubeCfg.StaticPodURL != "" {
      klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
       // 2.注册URL数据来源
      config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
   }

   if kubeDeps.KubeClient != nil {
      klog.InfoS("Adding apiserver pod source")
       // 3.注册ApiServer的数据来源
      config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
   }
   return cfg, nil
}

cfg := config.NewPodConfig是PodConfig的结构初始化函数,下面的三个函数看起来就有亲缘关系,

  • config.NewSourceFile
    config.NewSourceURL
    config.NewSourceApiserver
    上面的三个方法看起来实现了相同的代码逻辑,即注册数据源,从方法的后缀可以区分数据来源。从注释上可以看出,config.NewSourceFile似乎是加载staticpod所在的目录下配置信息,config.NewSourceApiserver是从apiserver处拿取的pod数据信息,URL这个数据源似乎不常用。我们继续追踪config.NewSourceFile函数,从这个函数来追踪kubelet根据配置文件加载POD执行的详细细节.
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
   path = strings.TrimRight(path, string(os.PathSeparator))

   // 1.初始化结构体
   config := newSourceFile(path, nodeName, period, updates)
   klog.V(1).InfoS("Watching path", "path", path)
   //2. 数据源初始化
   config.run()
}

func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
   // 1.该函数暂时不知道调用逻辑,但是看起来是将配置文件解析成的数组类型数据,包装成kubetypes.PodUpdate,并将发送到指定channel
   send := func(objs []interface{}) {
      var pods []*v1.Pod
      for _, o := range objs {
         pods = append(pods, o.(*v1.Pod))
      }
      updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
   }
   // 2.该结构体不知道具体作用和调用入口,可能是在run方法中有具体调用
   store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
   return &sourceFile{
      path:           path,
      nodeName:       nodeName,
      period:         period,
      store:          store,
      fileKeyMapping: map[string]string{},
      updates:        updates,
      watchEvents:    make(chan *watchEvent, eventBufferLen),
   }
}

回到config.run方法,关键的实现方法有s.listConfig(),s.consumeWatchEvent(e)和 s.startWatch(),在这个函数主要启动了一个定时任务去不断监听配置文件的变化,从而更新从File来源处获得的Pod描述信息。

func (s *sourceFile) run() {
    // 这是一种golang的定时任务执行实现方式
   listTicker := time.NewTicker(s.period)

   go func() {
      // Read path immediately to speed up startup.
      if err := s.listConfig(); err != nil {
         klog.ErrorS(err, "Unable to read config path", "path", s.path)
      }
      for {
         select {
         case <-listTicker.C:
            if err := s.listConfig(); err != nil {
               klog.ErrorS(err, "Unable to read config path", "path", s.path)
            }
         case e := <-s.watchEvents:
             //这一步是无意义的代码,未实现
            if err := s.consumeWatchEvent(e); err != nil {
               klog.ErrorS(err, "Unable to process watch event")
            }
         }
      }
   }()
   //这一步也是无意义代码,未实现
   s.startWatch()
}

继续回到s.listConfig()方法

func (s *sourceFile) listConfig() error {
    // 这里的path是staticpod的文件路径
   path := s.path
   statInfo, err := os.Stat(path)
   if err != nil {
      if !os.IsNotExist(err) {
         return err
      }
      // Emit an update with an empty PodList to allow FileSource to be marked as seen
      // 这里可能要等到update的channel消费的时候才可以看到,好像是空传了一个消息
      s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
      return fmt.Errorf("path does not exist, ignoring")
   }

   switch {
       // path是文件夹的处理逻辑在这里
   case statInfo.Mode().IsDir():
       // s.extractFromDir输出的结果就是[]*v1.Pod,盲猜这里就是将文件解析成pod struct结构体的地方
      pods, err := s.extractFromDir(path)
      if err != nil {
         return err
      }
      if len(pods) == 0 {
         // Emit an update with an empty PodList to allow FileSource to be marked as seen
         s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
         return nil
      }
      // 核心处理逻辑
      return s.replaceStore(pods...)
  // path是常规文件的处理逻辑在这里
   case statInfo.Mode().IsRegular():
      pod, err := s.extractFromFile(path)
      if err != nil {
         return err
      }
      return s.replaceStore(pod)

   default:
      return fmt.Errorf("path is not a directory or file")
   }
}
  • pods, err := s.extractFromDir(path)
  • pod, err := s.extractFromFile(path)
    上面两个分支看似是处理不同的逻辑,但实则是在做相同的事情,无非一个是读取多个pod文件,一个是处理单个pod文件。深入到pods, err := s.extractFromDir(path) 方法内部,发现最终还是调用了 s.extractFromFile方法。
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
   klog.V(3).InfoS("Reading config file", "path", filename)
   defer func() {
      if err == nil && pod != nil {
         objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
         if keyErr != nil {
            err = keyErr
            return
         }
         // 这个结构体是一个pod文件对应pod名称的缓存
         s.fileKeyMapping[filename] = objKey
      }
   }()

   file, err := os.Open(filename)
   if err != nil {
      return pod, err
   }
   defer file.Close()

   //读取文件内容
   data, err := utilio.ReadAtMost(file, maxConfigLength)
   if err != nil {
      return pod, err
   }

   defaultFn := func(pod *api.Pod) error {
      return s.applyDefaults(pod, filename)
   }
   // 解析pod文件
   parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
   if parsed {
      if podErr != nil {
         return pod, podErr
      }
      return pod, nil
   }

   return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
}

tryDecodeSinglePod是单个pod的解析步骤,

// tryDecodeSinglePod takes data and tries to extract valid Pod config information from it.
func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v1.Pod, err error) {
   // JSON is valid YAML, so this should work for everything.
   // 将文件内容解析成json结构体
   json, err := utilyaml.ToJSON(data)
   if err != nil {
      return false, nil, err
   }
   // 将json结构体解析成runtime.Object
   obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), json)
   if err != nil {
      return false, pod, err
   }

   newPod, ok := obj.(*api.Pod)
   // Check whether the object could be converted to single pod.
   if !ok {
      return false, pod, fmt.Errorf("invalid pod: %#v", obj)
   }

   // Apply default values and validate the pod.
   //为pod结构体添加status等字段的默认值
   if err = defaultFn(newPod); err != nil {
      return true, pod, err
   }
   if errs := validation.ValidatePodCreate(newPod, validation.PodValidationOptions{}); len(errs) > 0 {
      return true, pod, fmt.Errorf("invalid pod: %v", errs)
   }
   v1Pod := &v1.Pod{}
   if err := k8s_api_v1.Convert_core_Pod_To_v1_Pod(newPod, v1Pod, nil); err != nil {
      klog.ErrorS(err, "Pod failed to convert to v1", "pod", klog.KObj(newPod))
      return true, nil, err
   }
   return true, v1Pod, nil
}

至此,从文件中解析成一个v1.Pod{}的结构体的步骤完毕了,我们在回到s.listConfig()方法中,如果一切正常,最终对于解析出的所有pod将落入s.replaceStore(pods…)这个方法中执行相关逻辑。这一步骤可以填上store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc),这个未知的结构体的坑上来。
我们首先回顾一下这个Replace函数的具体实现:

// Replace will delete the contents of current store, using instead the given list.
// 'u' takes ownership of the list, you should not reference the list again
// after calling this function.
// The new contents complete state will be sent by calling PushFunc after replacement.
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
   if err := u.Store.Replace(list, resourceVersion); err != nil {
      return err
   }
   u.PushFunc(u.Store.List())
   return nil
}

这个函数执行了两个函数逻辑,其一是u.Store.Replace(list, resourceVersion),其二是u.PushFunc(u.Store.List()),首先第一个u.Store是一个线程安全的map结构,主要是用来更新pod缓存数据,pushFunction的初始化逻辑如下,这里主要是将所有的解析出的Pod数据包装成kubetypes.PodUpdate结构体发送到update的channel中:

// cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)

send := func(objs []interface{}) {
   var pods []*v1.Pod
   for _, o := range objs {
      pods = append(pods, o.(*v1.Pod))
   }
   updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}

我们来追踪一下updates是在哪里被传递进来的,cfg.Channel函数中传递了一个channel到sourceFile结构体中。

config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))

我们沿着cfg.Channel(kubetypes.FileSource)这个方法追踪update channel的生成逻辑。

//就是这里
if kubeCfg.StaticPodPath != "" {
   klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
   config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}

func (c *PodConfig) Channel(source string) chan<- interface{} {
   c.sourcesLock.Lock()
   defer c.sourcesLock.Unlock()
   // 在source中插入记录,这里的source是一个map,用来根据消息来源渠道来去重
   c.sources.Insert(source)
   // 返回channel通道
   return c.mux.Channel(source)
}

在追踪Channel这个方法之前,我们先看一下mux的具体实现,mux的初始化步骤在NewPodConfig,podConfig初始化的时候实现的,我们将这几段关联的代码联合在一起来琢磨琢磨。Mux中有两个字段,其一是Merger,这个字段主要声明了多数据源合并数据的具体实现细节,其二是source,这个是channel通道缓存结构体。

// Mux is a class for merging configuration from multiple sources.  Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
   // Invoked when an update is sent to a source.
   // 多路复用合并方式
   merger Merger

   // Sources and their lock.
   sourceLock sync.RWMutex
   // Maps source names to channels
   // channel的缓存器
   sources map[string]chan interface{}
}

func NewMux(merger Merger) *Mux {
   mux := &Mux{
      sources: make(map[string]chan interface{}),
      merger:  merger,
   }
   return mux
}

// 初始化步骤在podConfig结构体初始化过程中
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
   updates := make(chan kubetypes.PodUpdate, 50)
   storage := newPodStorage(updates, mode, recorder)
   podConfig := &PodConfig{
      pods:    storage,
      mux:     config.NewMux(storage),
      updates: updates,
      sources: sets.String{},
   }
   return podConfig
}

函数 c.mux.Channel(source)做的事情很简单,就是返回Mux的source中缓存的channel通道,如果source对应的channel不存在就创建

func (m *Mux) Channel(source string) chan interface{} {
   if len(source) == 0 {
      panic("Channel given an empty name")
   }
   m.sourceLock.Lock()
   defer m.sourceLock.Unlock()
   channel, exists := m.sources[source]
   if exists {
      return channel
   }
   newChannel := make(chan interface{})
   m.sources[source] = newChannel
   go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
   return newChannel
}

函数中的 go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)值得注意,似乎在返回channel通道的同时,还启动了一个协程来持续监听这个通道内的数据信息,并做了处理步骤。

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
   for update := range listenChannel {
       // 这里的merge在podCofig的初始化的时候被创建
      m.merger.Merge(source, update)
   }
}

channel通道中信息有一个算一个都落入了m.merger.Merge的方法中进行处理,继续追踪Merge的方法的实现上来,其中merge的实现类为podStorage这个结构体,我们顺藤摸瓜,看看Merge函数是怎么实现的

func (s *podStorage) Merge(source string, change interface{}) error {
   s.updateLock.Lock()
   defer s.updateLock.Unlock()

   //这里的sourcesSeen是一个set结构体
   seenBefore := s.sourcesSeen.Has(source)
   // 对pod的变动数据进行归类,分流
   adds, updates, deletes, removes, reconciles := s.merge(source, change)
   // 判断是不是首次创建
   firstSet := !seenBefore && s.sourcesSeen.Has(source)

   // deliver update notifications
   switch s.mode {
       // 根据分流的结果,将podUpdate事件发送到updatechannel中
   case PodConfigNotificationIncremental:
      if len(removes.Pods) > 0 {
         s.updates <- *removes
      }
      if len(adds.Pods) > 0 {
         s.updates <- *adds
      }
      if len(updates.Pods) > 0 {
         s.updates <- *updates
      }
      if len(deletes.Pods) > 0 {
         s.updates <- *deletes
      }
      if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
         // Send an empty update when first seeing the source and there are
         // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
         // the source is ready.
         s.updates <- *adds
      }
      // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
      if len(reconciles.Pods) > 0 {
         s.updates <- *reconciles
      }

   case PodConfigNotificationSnapshotAndUpdates:
      if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
         s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
      }
      if len(updates.Pods) > 0 {
         s.updates <- *updates
      }
      if len(deletes.Pods) > 0 {
         s.updates <- *deletes
      }

   case PodConfigNotificationSnapshot:
      if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
         s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
      }

   case PodConfigNotificationUnknown:
      fallthrough
   default:
      panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
   }

   return nil
}

上面提到了s.merge函数是用来对channel中的pod变更进行分流的函数,我们现在看看这个函数是在做什么事情:

  • 1.首先将待处理的kubetypes.PodUpdate事件类别进行了分组,定义了5个桶分别存放,PodUpdate中包含三个字段
type PodUpdate struct {
   // pods
   Pods   []*v1.Pod
   // 执行的操作
   Op     PodOperation
   // 数据来源
   Source string
}

其中,PodOperation定义了kubelet处理Pod时的不同操作类型。每种操作类型表示在Pod生命周期中的不同状态或事件。以下是每个操作类型的简要解释:
SET: 设置当前源的Pod配置。表示kubelet收到了Pod配置列表的完整更新事件,通常意味着kubelet收到了一组来自特定源(例如API服务器)的Pod信息。
ADD: 表示新增的Pod。这个操作类型意味着在此源新增了一个Pod,kubelet需要将该Pod添加到其内部状态和调度队列中。
DELETE: 表示从此源优雅地删除了Pod。这意味着该Pod已被删除,kubelet需要在完成任何必要的清理工作之后停止容器并删除该Pod。
REMOVE: 表示从此源移除了Pod。在某些情况下,可能不希望等待Pod的优雅删除操作(例如,删除操作耗时过长),这时会发出该操作类型,kubelet将尽快停止容器并删除Pod。
UPDATE: 表示此源中的Pod已更新。这意味着kubelet需要检查Pod配置的变更,并根据新配置更新内部状态和调度队列。
RECONCILE: 表示此源中的Pod有意外状态,kubelet需要与该源对Pod的状态进行协调。这个操作类型通常用于当kubelet检测到Pod状态与预期值不一致或状态变化时(例如,资源使用情况、健康状态等)。
这些操作类型主要用于在kubelet内部管理Pod生命周期事件,以确保kubelet根据当前Pod配置和状态做出正确的操作。

  • 2.划分了四个分支去处理不同PodOperation
    我们仅从ADD这个分支继续分析,在这个分支里,首先是打印从源数据获取的事件中的操作类别,然后使用updatePodsFunc去重置事件
  • 3.updatePodsFunc主要比较历史pod和kubetypes.PodUpdate事件中的pod定义,重新刷新kubelet应该对当前pod的真实操作标识:这里我的理解是,从不同数据源中拿到的pod事件中的操作标识符有可能不是Pod的真实操作标识符,需要比照历史的pod描述信息,重新标记kubelet对于当前pod应该采取的操作,核心逻辑落在checkAndUpdatePod这个方法中,这个方法重新将kubetypes.PodUpdate事件中包含的pod进行归类,重新包装成正确的kubetypes.PodUpdate事件,按照顺序放置到update的channel中
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
   //这里的change是从channel中读取的事件
   s.podLock.Lock()
   defer s.podLock.Unlock()

   // 记录器,记录从channel获取的pod变更记录的归属,分成了5个桶
   addPods := []*v1.Pod{}
   updatePods := []*v1.Pod{}
   deletePods := []*v1.Pod{}
   removePods := []*v1.Pod{}
   reconcilePods := []*v1.Pod{}

// s.pods缓存了不同渠道pod数据信息
   pods := s.pods[source]
   if pods == nil {
      pods = make(map[types.UID]*v1.Pod)
   }

   // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
   // After updated, new pod will be stored in the pod cache *pods*.
   // Notice that *pods* and *oldPods* could be the same cache.
   updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
      // 1)对pod数据进行去重
      filtered := filterInvalidPods(newPods, source, s.recorder)
      //2)对上面经过筛选的pod数据进行整理,根据checkAndUpdatePod函数确定不同的pod数据变更对应的分类归属
      for _, ref := range filtered {
         // Annotate the pod with the source before any comparison.
         if ref.Annotations == nil {
            ref.Annotations = make(map[string]string)
         }
         ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
         if existing, found := oldPods[ref.UID]; found {
            pods[ref.UID] = existing
            needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
            if needUpdate {
               updatePods = append(updatePods, existing)
            } else if needReconcile {
               reconcilePods = append(reconcilePods, existing)
            } else if needGracefulDelete {
               deletePods = append(deletePods, existing)
            }
            continue
         }
         recordFirstSeenTime(ref)
         pods[ref.UID] = ref
         addPods = append(addPods, ref)
      }
   }

   update := change.(kubetypes.PodUpdate)
   switch update.Op {
   case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
      if update.Op == kubetypes.ADD {
         klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      } else if update.Op == kubetypes.DELETE {
         klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      } else {
         klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      }
      updatePodsFunc(update.Pods, pods, pods)

   case kubetypes.REMOVE:
      klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
      for _, value := range update.Pods {
         if existing, found := pods[value.UID]; found {
            // this is a delete
            delete(pods, value.UID)
            removePods = append(removePods, existing)
            continue
         }
         // this is a no-op
      }

   case kubetypes.SET:
      klog.V(4).InfoS("Setting pods for source", "source", source)
      // 标识source来源,重置pod数据源中的事件缓存
      s.markSourceSet(source)
      // Clear the old map entries by just creating a new map
      oldPods := pods
      pods = make(map[types.UID]*v1.Pod)
      updatePodsFunc(update.Pods, oldPods, pods)
      for uid, existing := range oldPods {
         if _, found := pods[uid]; !found {
            // this is a delete
            removePods = append(removePods, existing)
         }
      }

   default:
      klog.InfoS("Received invalid update type", "type", update)

   }

   s.pods[source] = pods

   adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
   updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
   deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
   removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
   reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

   return adds, updates, deletes, removes, reconciles
}

至此,我们大概已经梳理出了config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))这段代码的含义。
这段代码是一个典型的生产者消费者模式,pod数据的来源有三个分别是文件File,Apiserver、Url,三者各自启动了一个channel通道,并且将数据源的pod数据序列化成v1.Pod{}结构,并且推送到channel通道中。channel通道在创建的时候,启动了一个协程去消费通道中的数据,并将数据进行去重过滤后,修正每一个pod的真实操作标识,归类成不同的操作类型数据,并发送到公共的事件处理channel通道。
在这里插入图片描述
分析到这里,似乎已经走完了一调链路,但是我们还是不知道,当pod的数据汇集到update channel后,kubelet是怎么处理消息通道中的数据的。现在,我们可以将目光回到RunKubelet函数了,这个函数中有一个startKubelet函数现在没有分析。

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
   // start the kubelet
   // 这里是核心逻辑入口,这里的Updates()函数返回的就是update的channel通道
   go k.Run(podCfg.Updates())

   // start the kubelet server
   if enableServer {
      go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
   }
   // readOnlyApi 入口
   if kubeCfg.ReadOnlyPort > 0 {
      go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
   }
   if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
      go k.ListenAndServePodResources()
   }
}

看起来核心的逻辑落在了Run函数中,左饶右绕的看起来着实很痛苦。在这个方法中启动了很多的协程去定时执行一些逻辑
go kl.cloudResourceSyncManager.Run(wait.NeverStop):云端资源的同步?
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop):启动volumeManage
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop):定时向apiserver同步node节点状态信息
go kl.nodeLeaseController.Run(wait.NeverStop):启动NodeLease机制,kubelet 使用一种称为 “NodeLease” 的机制来向控制平面报告节点的健康状态和可用性
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop):定期更新runtime运行时状态。

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
   //1.创建logserver服务器
   if kl.logServer == nil {
      kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
   }
   if kl.kubeClient == nil {
      klog.InfoS("No API server defined - no node status update will be sent")
   }

   // Start the cloud provider sync manager
   if kl.cloudResourceSyncManager != nil {
       
      go kl.cloudResourceSyncManager.Run(wait.NeverStop)
   }
   // 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块
   if err := kl.initializeModules(); err != nil {
      kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
      klog.ErrorS(err, "Failed to initialize internal modules")
      os.Exit(1)
   }

   // Start volume manager
   go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

   if kl.kubeClient != nil {
      // Introduce some small jittering to ensure that over time the requests won't start
      // accumulating at approximately the same time from the set of nodes due to priority and
      // fairness effect.
      // 5、执行 kl.syncNodeStatus 定时同步 Node 状态
      go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
      go kl.fastStatusUpdateOnce()

      // start syncing lease
      //启用 NodeLease 机制
      go kl.nodeLeaseController.Run(wait.NeverStop)
   }
   //6、定期更新runtime状态
   go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

   // Set up iptables util rules
   // 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则
   if kl.makeIPTablesUtilChains {
      kl.initNetworkUtil()
   }

   // Start component sync loops.
   kl.statusManager.Start()

   // Start syncing RuntimeClasses if enabled.
   if kl.runtimeClassManager != nil {
      kl.runtimeClassManager.Start(wait.NeverStop)
   }

   // Start the pod lifecycle event generator.
   // 12、启动 pleg,持续监听container变化并同步到podcache中
   kl.pleg.Start()
   // 13、调用 kl.syncLoop 监听 pod 变化
   kl.syncLoop(updates, kl)
}

至此kubelet的启动流程就梳理完毕。慢慢长路,刚刚启程。如果你在互联网上检索kubelet源码分析,可能你对pleg和syncLoop并不陌生。对于pod的实际操作过程实际上是在这两个步骤中执行的。后文将继续分析kubelet中pod的创建流程。敬请期待。

后记

笔者作为云原生的初学者,在源码分析的过程中难免会出现一些错误,也留了一些坑没有填上,期望大家能够在编辑器中随着笔者的行文思路去逐步阅读kubelet的源码。欢迎大家指正,也期望大家共同进步。

文章来源:https://blog.csdn.net/weixin_38757398/article/details/134970204
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。