入口点解析
应用入口点
K8s API Server 的主要入口点位于 cmd/kube-apiserver/apiserver.go
文件的
源码:https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/apiserver.go
package main
import (
"os"
_ "time/tzdata" // for timeZone support in CronJob
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/kubernetes/cmd/kube-apiserver/app"
)
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
其中的 app.NewAPIServerCommand()
是构建的一个 cobra
的命令对象,cli.Run
然后执行该命令
所以直接查看 NewAPIServerCommand
函数是如何构造 cobra.Command
对象的:
源码:https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/app/server.go#L92-L158
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
// ...
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
// ...
}
// ...
return cmd
}
NewAPIServerCommand
使用默认参数创建一个*cobra.Command
对象NewServerRunOptions
使用默认参数创建一个新的ServerRunOptions
对象-
ServerRunOption
对象是运行 apiserver 需要的对象 -
依次设置默认选项(set default options)和校验选项(validate options)
该函数最核心的功能就是使用 Complete(s)
函数来生成 apiserver 启动需要的默认参数,然后将默认参数传递给 Run
函数进行启动
源码 https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/app/server.go#L160-L178
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
server, err := CreateServerChain(completeOptions)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
- Run 运行指定的 APIServer,不能退出
-
CreateServerChain(completeOptions)
创建服务链(包含的 3 个 server 组件) -
server.PrepareRun()
服务启动前的准备工作,包括健康检查、存活检查、OpenAPI 路由注册等 -
prepared.Run(stopCh)
正式启动运行
在 Run 函数中首先会通过 CreateServerChain
函数通过委托创建连接的 APIServer 对象
源码 https://github.com/kubernetes/kubernetes/blob/v1.26.2/cmd/kube-apiserver/app/server.go#L180-L217
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}
上面的函数中可以看到 CreateServerChain
会创建 3 个 server:APIExtensionServer
、KubeAPIServer
、AggregratorServer
APIServer 就是依靠这 3 个组件来对不同类型的请求进行处理的:
APIExtensionServer
: 主要负责处理 CustomResourceDefinition(CRD)方面的请求KubeAPIServer
: 主要负责处理 K8s 内置资源的请求,此外还会包括通用处理、认证、鉴权等AggregratorServer
: 主要负责聚合器方面的处理,它充当一个代理服务器,将请求转发到聚合进来的 K8s service 中
创建每个 server 都有对应的 config,可以看出上面函数中的 apiExtensionServer
和 aggregatorServer
的 Config 需要依赖 kubeAPIServerConfig
,而这几个 ServerConfig 都需要依赖 GenericConfig
,CreateKubeAPIServerConfig
函数创建 kubeAPIServerConfig
,在该函数中通过调用 buildGenericConfig
来创建 GenericConfig 对象,如下代码所示