Kubelet CNI 源码解析

  • cmd/kubelet/kubelet.go

  • pkg/kubelet/kubelet.go

注: 基于 Kubernetes release-1.9

网络

Kubernetes 容器使用的网络规范为 CNI(容器网络接口),CNI 包括方法规范和参数规范。Kubernetes 并不实际去操作容器的网络,而是通过遵循 CNI 规范的各种网络插件去管理容器网络资源,如 CalicoFlannelContiv netplugin 网络插件等。

CNI 接口

  • github.com/containernetworking/cni/libcni/api.go

CNI 接口只需要实现以下方法,实际就是两种,一个添加网络调用,一个删除调用:

type CNI interface {
AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error
AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}

网络初始化

Kubelet 启动过程中针对网络主要做以下步骤,分别是探针获取当前环境的网络插件以及初始化网络。

步骤 1:探针获取当前环境的网络插件

  • cmd/kubelet/app/server.go

func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
... ...
// 执行具体函数,获取当前环境的网络插件
NetworkPlugins: ProbeNetworkPlugins(s.CNIConfDir, s.CNIBinDir),
... ...
}
  • cmd/kubelet/app/plugins.go

// ProbeNetworkPlugins collects all compiled-in plugins
func ProbeNetworkPlugins(cniConfDir, cniBinDir string) []network.NetworkPlugin {
allPlugins := []network.NetworkPlugin{}
// for each existing plugin, add to the list
allPlugins = append(allPlugins, cni.ProbeNetworkPlugins(cniConfDir, cniBinDir)...)
allPlugins = append(allPlugins, kubenet.NewPlugin(cniBinDir))
return allPlugins
}
  • pkg/kubelet/network/plugins.go

以下是 kubelet NetworkPlugin 接口,pkg/kubelet/network/cni/cni.gocniNetworkPlugin 实现了这套接口:

// Plugin is an interface to network plugins for the kubelet
type NetworkPlugin interface {
// Init initializes the plugin. This will be called exactly once
// before any other methods are called.
Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error
// Called on various events like:
// NET_PLUGIN_EVENT_POD_CIDR_CHANGE
Event(name string, details map[string]interface{})
// Name returns the plugin's name. This will be used when searching
// for a plugin by name, e.g.
Name() string
// Returns a set of NET_PLUGIN_CAPABILITY_*
Capabilities() utilsets.Int
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error
// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error
// GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container
GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error)
// Status returns error if the network plugin is in error state
Status() error
}
  • pkg/kubelet/network/cni/cni.go

func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, binDir, vendorCNIDirPrefix string) []network.NetworkPlugin {
if binDir == "" {
// DefaultCNIDir 默认值为 `/opt/cni/bin`
binDir = DefaultCNIDir
}
plugin := &cniNetworkPlugin{
defaultNetwork: nil,
// 默认会设置 loNetwork 用于添加 lo 设备,所以在 binDir 下,即 CNI 插件目录下必须需要 `loopback` 插件
loNetwork: getLoNetwork(binDir, vendorCNIDirPrefix),
execer: utilexec.New(),
pluginDir: pluginDir,
binDir: binDir,
vendorCNIDirPrefix: vendorCNIDirPrefix,
}
// sync NetworkConfig in best effort during probing.
// 探测网络,并同步网络配置,此处没有针对 err 处理,syncNetworkConfig 函数执行错误只会记录相关日志
plugin.syncNetworkConfig()
// 虽然是个列表,但运行时只会支持一种插件
return []network.NetworkPlugin{plugin}
}
func ProbeNetworkPlugins(pluginDir, binDir string) []network.NetworkPlugin {
return probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, binDir, "")
}
... ...
// 探测网络,并设置插件默认网络
func (plugin *cniNetworkPlugin) syncNetworkConfig() {
network, err := getDefaultCNINetwork(plugin.pluginDir, plugin.binDir, plugin.vendorCNIDirPrefix)
if err != nil {
glog.Warningf("Unable to update cni config: %s", err)
return
}
plugin.setDefaultNetwork(network)
}
func getDefaultCNINetwork(pluginDir, binDir, vendorCNIDirPrefix string) (*cniNetwork, error) {
// 默认 pluginDir `/etc/cni/net.d`
if pluginDir == "" {
pluginDir = DefaultNetDir
}
files, err := libcni.ConfFiles(pluginDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return nil, err
case len(files) == 0:
return nil, fmt.Errorf("No networks found in %s", pluginDir)
}
sort.Strings(files)
// 遍历所有的配置文件,只要匹配文件满足条件就返回,因此多个配置设置是无效的
for _, confFile := range files {
var confList *libcni.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = libcni.ConfListFromFile(confFile)
if err != nil {
glog.Warningf("Error loading CNI config list file %s: %v", confFile, err)
continue
}
} else {
conf, err := libcni.ConfFromFile(confFile)
if err != nil {
glog.Warningf("Error loading CNI config file %s: %v", confFile, err)
continue
}
// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)
continue
}
confList, err = libcni.ConfListFromConf(conf)
if err != nil {
glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err)
continue
}
}
if len(confList.Plugins) == 0 {
glog.Warningf("CNI config list %s has no networks, skipping", confFile)
continue
}
confType := confList.Plugins[0].Network.Type
// Search for vendor-specific plugins as well as default plugins in the CNI codebase.
vendorDir := vendorCNIDir(vendorCNIDirPrefix, confType)
cninet := &libcni.CNIConfig{
Path: []string{vendorDir, binDir},
}
network := &cniNetwork{name: confList.Name, NetworkConfig: confList, CNIConfig: cninet}
return network, nil
}
return nil, fmt.Errorf("No valid networks found in %s", pluginDir)
}

步骤 2:初始化网络插件

  • pkg/kubelet/kubelet.go

plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, crOptions.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, hairpinMode, nonMasqueradeCIDR, int(crOptions.NetworkPluginMTU))
if err != nil {
return nil, err
}
klet.networkPlugin = plug
  • pkg/kubelet/network/plugins.go

// InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names.
func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) (NetworkPlugin, error) {
// 如果未指定网络插件 `--network-plugin`,默认为 `noop` 插件,使用 CNI 网络,指定该插件为 `cni`,
// 关于 `noop` 具体详见官方说明 https://kubernetes.io/docs/concepts/cluster-administration/network-plugins/
if networkPluginName == "" {
// default to the no_op plugin
plug := &NoopNetworkPlugin{}
plug.Sysctl = utilsysctl.New()
// `noop` 网络初始化
if err := plug.Init(host, hairpinMode, nonMasqueradeCIDR, mtu); err != nil {
return nil, err
}
return plug, nil
}
pluginMap := map[string]NetworkPlugin{}
allErrs := []error{}
for _, plugin := range plugins {
name := plugin.Name()
if errs := validation.IsQualifiedName(name); len(errs) != 0 {
allErrs = append(allErrs, fmt.Errorf("network plugin has invalid name: %q: %s", name, strings.Join(errs, ";")))
continue
}
if _, found := pluginMap[name]; found {
allErrs = append(allErrs, fmt.Errorf("network plugin %q was registered more than once", name))
continue
}
pluginMap[name] = plugin
}
// 确认是否和与指定的网络插件匹配,如果匹配则进行相关初始化
chosenPlugin := pluginMap[networkPluginName]
if chosenPlugin != nil {
err := chosenPlugin.Init(host, hairpinMode, nonMasqueradeCIDR, mtu)
if err != nil {
allErrs = append(allErrs, fmt.Errorf("Network plugin %q failed init: %v", networkPluginName, err))
} else {
glog.V(1).Infof("Loaded network plugin %q", networkPluginName)
}
} else {
allErrs = append(allErrs, fmt.Errorf("Network plugin %q not found.", networkPluginName))
}
return chosenPlugin, utilerrors.NewAggregate(allErrs)
}

上文 hairpinMode 设置 haripin NAT 方式,使得服务后端 endpoints 访问服务自身时负载到本地,配置项为 --hairpin-mode,默认值 promiscuous-bridge

  • pkg/kubelet network/cni/cni.go

func (plugin *cniNetworkPlugin) Init(host network.Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error {
// platformInit 用于确定主机是否有 `nsenter` 命令
err := plugin.platformInit()
if err != nil {
return err
}
plugin.host = host
plugin.syncNetworkConfig()
return nil
}

网络操作

网络操作主要是 Pod 创建的网络添加以及删除的网络回收操作,上文中介绍了 NetworkPlugin 接口,其中包含了添加网络和删除网络的方法:

  • pkg/kubelet/network/plugins.go

// Plugin is an interface to network plugins for the kubelet
type NetworkPlugin interface {
... ...
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error
// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error
... ...
}

以下为 Kubelet 调用 CNI 网络的具体操作实现:

添加网络

  • pkg/kubelet/network/cni/cni.go

func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
if err := plugin.checkInitialized(); err != nil {
return err
}
// 通过 GetNetNS() 获取指定容器 net 命名空间路径,格式为 `/proc/<pid>/net`
// pkg/kubelet/dockershim/helpers_linux.go `getNetworkNamespace`
netnsPath, err := plugin.host.GetNetNS(id.ID)
if err != nil {
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
// Windows doesn't have loNetwork. It comes only with Linux
// 给容器生成 lo 网卡
if plugin.loNetwork != nil {
if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath); err != nil {
glog.Errorf("Error while adding to cni lo network: %s", err)
return err
}
}
_, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath)
if err != nil {
glog.Errorf("Error while adding to cni network: %s", err)
return err
}
return err
}
... ...
func (plugin *cniNetworkPlugin) addToNetwork(network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string) (cnitypes.Result, error) {
rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath)
if err != nil {
glog.Errorf("Error adding network when building cni runtime conf: %v", err)
return nil, err
}
netConf, cniNet := network.NetworkConfig, network.CNIConfig
glog.V(4).Infof("About to add CNI network %v (type=%v)", netConf.Name, netConf.Plugins[0].Network.Type)
res, err := cniNet.AddNetworkList(netConf, rt)
if err != nil {
glog.Errorf("Error adding network: %v", err)
return nil, err
}
return res, nil
}
  • github.com/containernetworking/cni/libcni/api.go

// AddNetworkList executes a sequence of plugins with the ADD command
func (c *CNIConfig) AddNetworkList(list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) {
var prevResult types.Result
for _, net := range list.Plugins {
pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
if err != nil {
return nil, err
}
newConf, err := buildOneConfig(list, net, prevResult, rt)
if err != nil {
return nil, err
}
// 调用插件添加网络
prevResult, err = invoke.ExecPluginWithResult(pluginPath, newConf.Bytes, c.args("ADD", rt))
if err != nil {
return nil, err
}
}
return prevResult, nil
}

删除网络

  • pkg/kubelet/network/cni/cni.go

func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
if err := plugin.checkInitialized(); err != nil {
return err
}
// Lack of namespace should not be fatal on teardown
netnsPath, err := plugin.host.GetNetNS(id.ID)
if err != nil {
glog.Warningf("CNI failed to retrieve network namespace path: %v", err)
}
return plugin.deleteFromNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath)
}
... ...
func (plugin *cniNetworkPlugin) deleteFromNetwork(network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string) error {
rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath)
if err != nil {
glog.Errorf("Error deleting network when building cni runtime conf: %v", err)
return err
}
netConf, cniNet := network.NetworkConfig, network.CNIConfig
glog.V(4).Infof("About to del CNI network %v (type=%v)", netConf.Name, netConf.Plugins[0].Network.Type)
err = cniNet.DelNetworkList(netConf, rt)
if err != nil {
glog.Errorf("Error deleting network: %v", err)
return err
}
return nil
}
  • github.com/containernetworking/cni/libcni/api.go

// DelNetworkList executes a sequence of plugins with the DEL command
func (c *CNIConfig) DelNetworkList(list *NetworkConfigList, rt *RuntimeConf) error {
for i := len(list.Plugins) - 1; i >= 0; i-- {
net := list.Plugins[i]
pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
if err != nil {
return err
}
newConf, err := buildOneConfig(list, net, nil, rt)
if err != nil {
return err
}
// 调用插件删除网络
if err := invoke.ExecPluginWithoutResult(pluginPath, newConf.Bytes, c.args("DEL", rt)); err != nil {
return err
}
}
return nil
}

参考