kube-apiserver 启动流程

2024-01-09 11:53:24

? kube-apiserver组件负责将Kubernetes的“资源组、资源版本、资源”以RESTful风格的形式对外暴露并提供服务。 在Kubernetes源码中使用了go-restful框架(github.com/emicklei/go-restful),主要原因在于go-restful框架可定制程度高。

? go-restful框架支持多个Container(容器)。一个Container就相当于一个HTTP Server,不同Container之间监控不同的地址和端口,对外提供不同的HTTP服务。每个Container可以包含多个WebService(Web服务),WebService相当于一组不同服务的分类。每个WebService下又包含了多个Router(路由),Router根据HTTP请求的URL路由到对应的处理函数(即Handler Func) 。

? go-restful核心数据结构有Container、WebService、Route。其核心原理是将Container接收到的HTTP请求分发给匹配的WebService,再由WebService分发给Router中的Handler函数。

? kube-apiserver提供了3种HTTP Server服务,用于将庞大的kube-apiserver组件功能进行解耦,这3种HTTP Server分别是APIExtensionsSerer、KubeAPIServer、AggregatorServer。

  • APIExtensionsServer: API扩展服务(扩展器)。该服务提供了CRD(CustomResourceDefinitions)自定义资源服务,开发者可通过CRD对Kubernetes资源进行扩展,例如,通过crd-example扩展Kubernetes资源。 该服务通过CustomResourceDefinitions对象进行管理,并通过extensionsapiserver.Scheme资源注册表管理CRD相关资源。
  • AggregatorServer:API聚合服务(聚合器)。该服务提供了AA(APIAggregator)聚合服务,开发者可通过AA对Kubernetes聚合服务进行扩展,例如,metrics-server是Kubernetes系统集群的核心监控数据的聚合器,它是AggregatorServer服务的扩展实现。API聚合服务通过APIAggregator对象进行管理,并通过aggregatorscheme.Scheme资源注册表管理AA相关资源。
  • KubeAPIServer:API核心服务。该服务提供了Kubernetes内置核心资源服务,不允许开发者随意更改相关资源,例如,Pod、Service等内置核心资源会由Kubernetes官方维护。API核心服务通过Master对象进行管理, 并通过legacyscheme.Scheme资源注册表管理Master相关资源。

整体流程

? kube-apiserver组件启动代码逻辑如下图所示:

资源注册

kube-apiserver组件启动后的第一件事情是将Kubernetes所支持的资源注册到Scheme资源注册表中,这样后面启动的逻辑才能够从Scheme资源注册表中拿到资源信息并启动和运行APIExtensionsServer、KubeAPIServer、AggregatorServer这3种服务。资源的注册过程并不是通过函数调用触发的,而是通过Go语言的导入(import)和初始化(init)机制触发的

kube-apiserver资源注册分为两步:第1步,初始化Scheme资源注册表;第2步,注册Kubernetes所支持的资源。kubeapiserver导入了legacyscheme和controlplane包,在legacyscheme包中进行资源注册表的初始化,controlplane包中注册Kubernetes所支持的资源。

  • 文件pkg/api/legacyscheme/scheme.go中定义了Scheme资源注册表、Codec编解码器及ParameterCodec参数编解码器。它们被定义为全局变量,这些全局变量在kube-apiserver的任何地方都可以被调用,服务于KubeAPIServer。
var (
	// Scheme is the default instanceof runtime.Scheme to which types in the Kubernetes API are already registered.
	Scheme = runtime.NewScheme()
	
	// Codecsprovidesaccesstoencodinganddecodingforthescheme
	Codecs = serializer.NewCodecFactory(Scheme)
	
	// ParameterCodechandlesversioningofobjectsthatareconvertedtoqueryparameters.
	ParameterCodec = runtime.NewParameterCodec(Scheme)
)
  • controlplane包中的import_known_versions.go文件调用了Kubernetes资源下的install包, 通过导入包的机制触发初始化函数。
import (
	//These imports are the API groups the API server will support.
	_ "k8s.io/kubernetes/pkg/apis/admission/install"
	_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
	_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
	_ "k8s.io/kubernetes/pkg/apis/apps/install"
	_ "k8s.io/kubernetes/pkg/apis/authentication/install"
	_ "k8s.io/kubernetes/pkg/apis/authorization/install"
	_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
	_ "k8s.io/kubernetes/pkg/apis/batch/install"
	_ "k8s.io/kubernetes/pkg/apis/certificates/install"
	_ "k8s.io/kubernetes/pkg/apis/coordination/install"
	_ "k8s.io/kubernetes/pkg/apis/core/install"
	_ "k8s.io/kubernetes/pkg/apis/discovery/install"
	_ "k8s.io/kubernetes/pkg/apis/events/install"
	_ "k8s.io/kubernetes/pkg/apis/extensions/install"
	_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
	_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
	_ "k8s.io/kubernetes/pkg/apis/networking/install"
	_ "k8s.io/kubernetes/pkg/apis/node/install"
	_ "k8s.io/kubernetes/pkg/apis/policy/install"
	_ "k8s.io/kubernetes/pkg/apis/rbac/install"
	_ "k8s.io/kubernetes/pkg/apis/resource/install"
	_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
	_ "k8s.io/kubernetes/pkg/apis/storage/install"
)
  • 每个Kubernetes内部版本资源都定义install包,用于在kube-apiserver启动时注册资源。以apps资源组为例,install包中的init()中将不同版本的资源到注册表中。
  • func init() {
    	Install(legacyscheme.Scheme)
    }
    
    // Install registers the API group and adds types to a scheme
    // AddToScheme()中会调用每种资源定义的 addKnownTypes()函数,addKnownTypes()中会调用scheme.AddKnownTypes()完成资源注册
    func Install(scheme *runtime.Scheme) {
    	utilruntime.Must(apps.AddToScheme(scheme))
    	utilruntime.Must(v1beta1.AddToScheme(scheme))
    	utilruntime.Must(v1beta2.AddToScheme(scheme))
    	utilruntime.Must(v1.AddToScheme(scheme))
    	utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
    }

    APIExtensionsServer 和 AggregatorServer 资源注册过程在包 extensionsapiserver 和 aggregatorapiserver中完成,文件cmd/kube-apiserver/app/server.go如下。

import (
	...
	extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
	...
	aggregatorapiserver"k8s.io/kube-aggregator/pkg/apiserver"
	...
	"k8s.io/kubernetes/pkg/api/legacyscheme"
	"k8s.io/kubernetes/pkg/controlplane"
)

创建通用配置

? APIServer通用配置是kube-apiserver不同模块实例化所需的配置,在函数BuildGenericConfig()中实现,流程如下。

  • genericConfig实例化:执行genericapiserver.NewConfig()函数进行实例化,genericConfig.MergedResourceConfig用于设置启用/禁用GV(资源组、资源版本)及其Resource(资源)。如果未在命令行参数中指定启用/禁用的GV,则通过master.DefaultAPIResourceConfigSource启用默认设置的GV及其资源。默认只启用资源版本为Stable的资源,不启用Beta和Alpha资源版本的资源。
  • OpenAPI/Swagger配置:genericConfig.OpenAPIConfig用于生成OpenAPI规范。
  • StorageFactory存储(Etcd)配置:kubeapiserver.NewStorageFactoryConfig函数实例化了storageFactoryConfig对象,该对象定义了kubeapiserver与Etcd的交互方式,例如Etcd认证、Etcd地址、存储前缀等。另外, 该对象也定义了资源存储方式,例如资源信息、资源编码类型、资源状态等。
  • Authentication认证配置:Authentication.ApplyTo()创建认证配置。
  • Authorization授权配置:kube-apiserver通过BuildAuthorizer()函数实例化授权器。

创建APIExtensionsServer

创建APIExtensionsServer的代码在文件vendor/k8s.io/apiextensionsapiserver/pkg/apiserver/apiserver.go中的New()函数实现。

//New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
	// 1.创建 GenericAPIServer,APIExtensionsServer的运行依赖于GenericAPIServer,通过c.GenericConfig.New函数创建名为apiextensions-apiserver的服务。
	genericServer, err := c.GenericConfig.New("apiextensions-apiserver",delegationTarget)
	if err!= nil {
		return nil, err
	}
	
	...
	
	// 2.实例化 CustomResourceDefinitions,APIExtensionsServer(API扩展服务)通过CustomResourceDefinitions对象进行管理,
	// 实例化该对象后才能注册APIExtensionsServer下的资源。
	s := &CustomResourceDefinitions{
		GenericAPIServer:genericServer,
	}
	
	apiResourceConfig := c.GenericConfig.MergedResourceConfig
	// 3.实例化 APIGroupInfo,APIGroupInfo对象用于描述资源组信息, 其中该对象的VersionedResourcesStorageMap字段用于存储资源与资源存储对象的对应关系。
	// 表现形式为 map[string]map[string]rest.Storage(即<资源版本>/<资源>/<资源存储对象>) 
	// 注意:一个资源组对应一个APIGroupInfo对象,每个资源(包括子资源) 对应一个资源存储对象。
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme,metav1.ParameterCodec, Codecs)
	storage := map[string]rest.Storage{}
	
	//  如果 apiextensions.k8s.io/v1 资源组/资源版本已启用,将该资源组、 资源版本下的资源与资源存储对象进行映射
	//  并存储至 APIGroupInfo 对象的VersionedResourcesStorageMap字段中。
	if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
		customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
		if err != nil {
			return nil, err
		}
		storage[resource] = customResourceDefinitionStorage
		storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
	}
	if len(storage) > 0 {
		apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
	}
	
	// 4.InstallAPIGroup注册APIGroup,在函数 InstallREST()中添加路由,过程如下:
	// (1) 遍历 APIGroupInfo, 将<资源组>/<资源版本>/<资源名称>映射到 HTTP PATH 请求路径,
	// (2) 通过 InstallREST函数将资源存储对象作为资源的 Handlers 方法
	// (3) 最后使用 gorestful的ws.Route将定义好的请求路径和 Handlers 方法添加路由到 go-restful中
	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}
	
	...
	
	return s, nil
}

APIExtensionsServer路由的设置在InstallREST()函数中调用a.registerResourceHandlers()函数完成,代码调用过程为:InstallAPIGroup()s.installAPIResources()InstallREST()在InstallREST()的最后,通过container.Add函数将WebService添加到go-restful Container中。

func (g*APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
	// prefix定义了 HTTP PATH请求路径,其表现形式为<apiPrefix>/<group>/<version>(即/apis/apiextensions.k8s.io/v1)
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	// 实例化APIInstaller安装器。
	installer := &APIInstaller {
		group: g,
		prefix: prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}
	
	// 在installer.Install中创建一个gorestful WebService,并通过a.registerResourceHandlers()函数添加资源对应的路由
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	
	...
	
	// 将WebService添加到 go-restful Container中
	container.Add(ws)
	
	aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources)
	if err != nil {
		registrationErrors = append(registrationErrors, err)
	}
	return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}

registerResourceHandlers() 函数中为资源注册对应的Handlers方法(即资源存储对象Resource Storage) ,完成资源与资源Handlers方法的绑定并为go-restful WebService添加该路由。

func (a *APIInstaller) registerResourceHandlers(pathstring, storagerest.Storage, ws*restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
	
	...
	
	// 通过类型断言的方式确定当前storage支持那些操作,确定 storage 是否实现 creat、list 和 get等函数。
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)
	storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
	gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)
	
	...
	
	// Get the list of actions for the given scope.
	switch {
	// 构造无命名空间namespace资源的action
	case !namespaceScoped:
		
		...
		
	// 构造有命名空间namespace资源的action
	default:
		// 填充 resource path
		namespaceParamName := "namespaces"
		//Handler for standard REST verbs(GET, PUT, POST and DELETE).
		namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
		namespacedPath := namespaceParamName + "/{namespace}/" + resource
		namespaceParams := []*restful.Parameter{namespaceParam}
		
		...
		
		// 根据storage支持的操作添加相关的action到actions切片中
		actions=appendIf(actions,action{"LIST",resourcePath,resourceParams,namer,false},isLister)
		actions=appendIf(actions,action{"POST",resourcePath,resourceParams,namer,false},isCreater)
		actions=appendIf(actions,action{"DELETECOLLECTION",resourcePath,resourceParams,namer,false},isCollectionDeleter)
		
		...
	}
	
	...
	
	// 根据action添加路由
	for _, action := range actions {
		...
		
		// 实例化 restful.RouteBuilder
		routes:=[]*restful.RouteBuilder{}
		
		...
		
		// 按照acation.Verb添加路由
		switch action.Verb {
		case "GET": // Get a resource.
			// 设置路由对应的处理函数,handler处理函数并不是直接调用 Creater ,Updater等接口定义的方法,
			// 而是在外面包了一层代码进行一些额外的处理,例如对象的编解码,admission control 的处理逻辑,针对 watch 这种长链接需要进行协议的处理等,
			// 相关的定义在k8s.io/apiserver/pkg/endpoints/handlers包下。
			var handler restful.RouteFunction
			if isGetterWithOptions {
				// restfulGetResourceWithOptions()->GetResourceWithOptions()->r.Get(), GetResourceWithOptions()中会解析url中的选项,构建runtime.Object,然后交给r.Get()处理
				handler = restfulGetResourceWithOptions(getterWithOptions,reqScope,isSubresource)
			}else{
				// restfulGetResource()->GetResource()->r.Get(), GetResource()中将要查询串解码成metav1.GetOptions,然后交给r.Get()处理
				handler = restfulGetResource(getter,reqScope)
			}
			// 设置路由监控函数
			if needOverride {
				// need change the reported verb
				handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb),group,version,resource,subresource,requestScope,metrics.APIServerComponent,deprecated,removedRelease,handler)
			}else{
				handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
			}
			// AddWarningsHandler()中会将警告信息添加到req.Request.Context()中
			handler = utilwarning.AddWarningsHandler(handler, warnings)
			
			doc := "readthespecified" + kind
			if isSubresource {
				doc = "read" + subresource + "ofthespecified" + kind
			}
			// 将 handler注册到 rest容器中
			route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed. Defaults to 'false' unless the user-agent indicates a browser or command-line HTTP tool (curl and wget).")).
				Operation("read" + namespaced + kind + strings.Title(subresource) + operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			if isGetterWithOptions {
				if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
					return nil, nil, err
				}
			}
			addParams(route, action.Params)
			routes=append(routes, route)
		case "LIST": // List all resources of a kind.
			...
		case "PUT": // Update a resource.
			...
		case "PATCH": // Partially update a resource.
			...
		case "POST": // Create a resource.
			...
		case "DELETE": // Delete a resource.
			...
		case "WATCH": // Watch a resource.
			...
		case "WATCHLIST": // Watch all resources of a kind.
			...
		case "CONNECT":
			...
		default:
			return nil, nil, fmt.Errorf("unrecognized action verb:%s", action.Verb)
		}
		// 添加所有路由到 WebService中
		for _, route := range routes {
			route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind {
				Group: reqScope.Kind.Group,
				Version: reqScope.Kind.Version,
				Kind: reqScope.Kind.Kind,
			})
			route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
			ws.Route(route)
		}
		//Note: update GetAuthorizerAttributes() when adding a custom handler.
	}
	
	...
}

创建KubeAPIServer

创建KubeAPIServer的流程与创建APIExtensionsServer的流程类似,其原理都是将<资源组>/<资源版本>/<资源>与资源存储对象进行映射并将其存储至APIGroupInfo对象的VersionedResourcesStorageMap字段中。

未完待续。。。

文章来源:https://blog.csdn.net/UFOfuck/article/details/135472994
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。