入口点解析
应用入口点
K8s API Server 的主要入口点位于 cmd/kube-apiserver/apiserver.go 文件的
源码:https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/apiserver.go
package main
import (
"os"
_ "time/tzdata" // for timeZone support in CronJob
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/kubernetes/cmd/kube-apiserver/app"
)
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
其中的 app.NewAPIServerCommand() 是构建的一个 cobra 的命令对象,cli.Run 然后执行该命令
所以直接查看 NewAPIServerCommand 函数是如何构造 cobra.Command 对象的:
源码:https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/app/server.go#L92-L158
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
// ...
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
// ...
}
// ...
return cmd
}
NewAPIServerCommand使用默认参数创建一个*cobra.Command对象NewServerRunOptions使用默认参数创建一个新的ServerRunOptions对象-
ServerRunOption对象是运行 apiserver 需要的对象 -
依次设置默认选项(set default options)和校验选项(validate options)
该函数最核心的功能就是使用 Complete(s) 函数来生成 apiserver 启动需要的默认参数,然后将默认参数传递给 Run 函数进行启动
源码 https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/app/server.go#L160-L178
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
server, err := CreateServerChain(completeOptions)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
- Run 运行指定的 APIServer,不能退出
-
CreateServerChain(completeOptions)创建服务链(包含的 3 个 server 组件) -
server.PrepareRun()服务启动前的准备工作,包括健康检查、存活检查、OpenAPI 路由注册等 -
prepared.Run(stopCh)正式启动运行
在 Run 函数中首先会通过 CreateServerChain 函数通过委托创建连接的 APIServer 对象
源码 https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/app/server.go#L180-L217
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}
上面的函数中可以看到 CreateServerChain 会创建 3 个 server:APIExtensionServer、KubeAPIServer、AggregratorServer
APIServer 就是依靠这 3 个组件来对不同类型的请求进行处理的:
APIExtensionServer: 主要负责处理 CustomResourceDefinition(CRD)方面的请求KubeAPIServer: 主要负责处理 K8s 内置资源的请求,此外还会包括通用处理、认证、鉴权等AggregratorServer: 主要负责聚合器方面的处理,它充当一个代理服务器,将请求转发到聚合进来的 K8s service 中

创建每个 server 都有对应的 config,可以看出上面函数中的 apiExtensionServer 和 aggregatorServer 的 Config 需要依赖 kubeAPIServerConfig,而这几个 ServerConfig 都需要依赖 GenericConfig,CreateKubeAPIServerConfig 函数创建 kubeAPIServerConfig ,在该函数中通过调用 buildGenericConfig 来创建 GenericConfig 对象,如下代码所示