K
K
Kumu's wiki
搜索文档…
Kubelet CNI 源码解析
    cmd/kubelet/kubelet.go
    pkg/kubelet/kubelet.go
注: 基于 Kubernetes release-1.9

网络

Kubernetes 容器使用的网络规范为 CNI(容器网络接口),CNI 包括方法规范和参数规范。Kubernetes 并不实际去操作容器的网络,而是通过遵循 CNI 规范的各种网络插件去管理容器网络资源,如 CalicoFlannelContiv netplugin 网络插件等。

CNI 接口

    github.com/containernetworking/cni/libcni/api.go
CNI 接口只需要实现以下方法,实际就是两种,一个添加网络调用,一个删除调用:
1
type CNI interface {
2
AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
3
DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error
4
5
AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
6
DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
7
}
Copied!

网络初始化

Kubelet 启动过程中针对网络主要做以下步骤,分别是探针获取当前环境的网络插件以及初始化网络。

步骤 1:探针获取当前环境的网络插件

    cmd/kubelet/app/server.go
1
func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
2
... ...
3
// 执行具体函数,获取当前环境的网络插件
4
NetworkPlugins: ProbeNetworkPlugins(s.CNIConfDir, s.CNIBinDir),
5
... ...
6
}
Copied!
    cmd/kubelet/app/plugins.go
1
// ProbeNetworkPlugins collects all compiled-in plugins
2
func ProbeNetworkPlugins(cniConfDir, cniBinDir string) []network.NetworkPlugin {
3
allPlugins := []network.NetworkPlugin{}
4
5
// for each existing plugin, add to the list
6
allPlugins = append(allPlugins, cni.ProbeNetworkPlugins(cniConfDir, cniBinDir)...)
7
allPlugins = append(allPlugins, kubenet.NewPlugin(cniBinDir))
8
9
return allPlugins
10
}
Copied!
    pkg/kubelet/network/plugins.go
以下是 kubelet NetworkPlugin 接口,pkg/kubelet/network/cni/cni.gocniNetworkPlugin 实现了这套接口:
1
// Plugin is an interface to network plugins for the kubelet
2
type NetworkPlugin interface {
3
// Init initializes the plugin. This will be called exactly once
4
// before any other methods are called.
5
Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error
6
7
// Called on various events like:
8
// NET_PLUGIN_EVENT_POD_CIDR_CHANGE
9
Event(name string, details map[string]interface{})
10
11
// Name returns the plugin's name. This will be used when searching
12
// for a plugin by name, e.g.
13
Name() string
14
15
// Returns a set of NET_PLUGIN_CAPABILITY_*
16
Capabilities() utilsets.Int
17
18
// SetUpPod is the method called after the infra container of
19
// the pod has been created but before the other containers of the
20
// pod are launched.
21
SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error
22
23
// TearDownPod is the method called before a pod's infra container will be deleted
24
TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error
25
26
// GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container
27
GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error)
28
29
// Status returns error if the network plugin is in error state
30
Status() error
31
}
Copied!
    pkg/kubelet/network/cni/cni.go
1
func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, binDir, vendorCNIDirPrefix string) []network.NetworkPlugin {
2
if binDir == "" {
3
// DefaultCNIDir 默认值为 `/opt/cni/bin`
4
binDir = DefaultCNIDir
5
}
6
plugin := &cniNetworkPlugin{
7
defaultNetwork: nil,
8
// 默认会设置 loNetwork 用于添加 lo 设备,所以在 binDir 下,即 CNI 插件目录下必须需要 `loopback` 插件
9
loNetwork: getLoNetwork(binDir, vendorCNIDirPrefix),
10
execer: utilexec.New(),
11
pluginDir: pluginDir,
12
binDir: binDir,
13
vendorCNIDirPrefix: vendorCNIDirPrefix,
14
}
15
16
// sync NetworkConfig in best effort during probing.
17
// 探测网络,并同步网络配置,此处没有针对 err 处理,syncNetworkConfig 函数执行错误只会记录相关日志
18
plugin.syncNetworkConfig()
19
// 虽然是个列表,但运行时只会支持一种插件
20
return []network.NetworkPlugin{plugin}
21
}
22
23
func ProbeNetworkPlugins(pluginDir, binDir string) []network.NetworkPlugin {
24
return probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir, binDir, "")
25
}
26
27
... ...
28
29
// 探测网络,并设置插件默认网络
30
func (plugin *cniNetworkPlugin) syncNetworkConfig() {
31
network, err := getDefaultCNINetwork(plugin.pluginDir, plugin.binDir, plugin.vendorCNIDirPrefix)
32
if err != nil {
33
glog.Warningf("Unable to update cni config: %s", err)
34
return
35
}
36
plugin.setDefaultNetwork(network)
37
}
Copied!
1
func getDefaultCNINetwork(pluginDir, binDir, vendorCNIDirPrefix string) (*cniNetwork, error) {
2
// 默认 pluginDir `/etc/cni/net.d`
3
if pluginDir == "" {
4
pluginDir = DefaultNetDir
5
}
6
files, err := libcni.ConfFiles(pluginDir, []string{".conf", ".conflist", ".json"})
7
switch {
8
case err != nil:
9
return nil, err
10
case len(files) == 0:
11
return nil, fmt.Errorf("No networks found in %s", pluginDir)
12
}
13
14
sort.Strings(files)
15
// 遍历所有的配置文件,只要匹配文件满足条件就返回,因此多个配置设置是无效的
16
for _, confFile := range files {
17
var confList *libcni.NetworkConfigList
18
if strings.HasSuffix(confFile, ".conflist") {
19
confList, err = libcni.ConfListFromFile(confFile)
20
if err != nil {
21
glog.Warningf("Error loading CNI config list file %s: %v", confFile, err)
22
continue
23
}
24
} else {
25
conf, err := libcni.ConfFromFile(confFile)
26
if err != nil {
27
glog.Warningf("Error loading CNI config file %s: %v", confFile, err)
28
continue
29
}
30
// Ensure the config has a "type" so we know what plugin to run.
31
// Also catches the case where somebody put a conflist into a conf file.
32
if conf.Network.Type == "" {
33
glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)
34
continue
35
}
36
37
confList, err = libcni.ConfListFromConf(conf)
38
if err != nil {
39
glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err)
40
continue
41
}
42
}
43
if len(confList.Plugins) == 0 {
44
glog.Warningf("CNI config list %s has no networks, skipping", confFile)
45
continue
46
}
47
confType := confList.Plugins[0].Network.Type
48
49
// Search for vendor-specific plugins as well as default plugins in the CNI codebase.
50
vendorDir := vendorCNIDir(vendorCNIDirPrefix, confType)
51
cninet := &libcni.CNIConfig{
52
Path: []string{vendorDir, binDir},
53
}
54
network := &cniNetwork{name: confList.Name, NetworkConfig: confList, CNIConfig: cninet}
55
return network, nil
56
}
57
return nil, fmt.Errorf("No valid networks found in %s", pluginDir)
58
}
Copied!

步骤 2:初始化网络插件

    pkg/kubelet/kubelet.go
1
plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, crOptions.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, hairpinMode, nonMasqueradeCIDR, int(crOptions.NetworkPluginMTU))
2
if err != nil {
3
return nil, err
4
}
5
klet.networkPlugin = plug
Copied!
    pkg/kubelet/network/plugins.go
1
// InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names.
2
func InitNetworkPlugin(plugins []NetworkPlugin, networkPluginName string, host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) (NetworkPlugin, error) {
3
// 如果未指定网络插件 `--network-plugin`,默认为 `noop` 插件,使用 CNI 网络,指定该插件为 `cni`,
4
// 关于 `noop` 具体详见官方说明 https://kubernetes.io/docs/concepts/cluster-administration/network-plugins/
5
if networkPluginName == "" {
6
// default to the no_op plugin
7
plug := &NoopNetworkPlugin{}
8
plug.Sysctl = utilsysctl.New()
9
// `noop` 网络初始化
10
if err := plug.Init(host, hairpinMode, nonMasqueradeCIDR, mtu); err != nil {
11
return nil, err
12
}
13
return plug, nil
14
}
15
16
pluginMap := map[string]NetworkPlugin{}
17
18
allErrs := []error{}
19
for _, plugin := range plugins {
20
name := plugin.Name()
21
if errs := validation.IsQualifiedName(name); len(errs) != 0 {
22
allErrs = append(allErrs, fmt.Errorf("network plugin has invalid name: %q: %s", name, strings.Join(errs, ";")))
23
continue
24
}
25
26
if _, found := pluginMap[name]; found {
27
allErrs = append(allErrs, fmt.Errorf("network plugin %q was registered more than once", name))
28
continue
29
}
30
pluginMap[name] = plugin
31
}
32
33
// 确认是否和与指定的网络插件匹配,如果匹配则进行相关初始化
34
chosenPlugin := pluginMap[networkPluginName]
35
if chosenPlugin != nil {
36
err := chosenPlugin.Init(host, hairpinMode, nonMasqueradeCIDR, mtu)
37
if err != nil {
38
allErrs = append(allErrs, fmt.Errorf("Network plugin %q failed init: %v", networkPluginName, err))
39
} else {
40
glog.V(1).Infof("Loaded network plugin %q", networkPluginName)
41
}
42
} else {
43
allErrs = append(allErrs, fmt.Errorf("Network plugin %q not found.", networkPluginName))
44
}
45
46
return chosenPlugin, utilerrors.NewAggregate(allErrs)
47
}
Copied!
上文 hairpinMode 设置 haripin NAT 方式,使得服务后端 endpoints 访问服务自身时负载到本地,配置项为 --hairpin-mode,默认值 promiscuous-bridge
    pkg/kubelet network/cni/cni.go
1
func (plugin *cniNetworkPlugin) Init(host network.Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error {
2
// platformInit 用于确定主机是否有 `nsenter` 命令
3
err := plugin.platformInit()
4
if err != nil {
5
return err
6
}
7
8
plugin.host = host
9
10
plugin.syncNetworkConfig()
11
return nil
12
}
Copied!

网络操作

网络操作主要是 Pod 创建的网络添加以及删除的网络回收操作,上文中介绍了 NetworkPlugin 接口,其中包含了添加网络和删除网络的方法:
    pkg/kubelet/network/plugins.go
1
// Plugin is an interface to network plugins for the kubelet
2
type NetworkPlugin interface {
3
... ...
4
5
// SetUpPod is the method called after the infra container of
6
// the pod has been created but before the other containers of the
7
// pod are launched.
8
SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error
9
10
// TearDownPod is the method called before a pod's infra container will be deleted
11
TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error
12
... ...
13
}
Copied!
以下为 Kubelet 调用 CNI 网络的具体操作实现:

添加网络

    pkg/kubelet/network/cni/cni.go
1
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
2
if err := plugin.checkInitialized(); err != nil {
3
return err
4
}
5
// 通过 GetNetNS() 获取指定容器 net 命名空间路径,格式为 `/proc/<pid>/net`
6
// pkg/kubelet/dockershim/helpers_linux.go `getNetworkNamespace`
7
netnsPath, err := plugin.host.GetNetNS(id.ID)
8
if err != nil {
9
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
10
}
11
12
// Windows doesn't have loNetwork. It comes only with Linux
13
// 给容器生成 lo 网卡
14
if plugin.loNetwork != nil {
15
if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath); err != nil {
16
glog.Errorf("Error while adding to cni lo network: %s", err)
17
return err
18
}
19
}
20
21
_, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath)
22
if err != nil {
23
glog.Errorf("Error while adding to cni network: %s", err)
24
return err
25
}
26
27
return err
28
}
29
30
... ...
31
32
func (plugin *cniNetworkPlugin) addToNetwork(network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string) (cnitypes.Result, error) {
33
rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath)
34
if err != nil {
35
glog.Errorf("Error adding network when building cni runtime conf: %v", err)
36
return nil, err
37
}
38
39
netConf, cniNet := network.NetworkConfig, network.CNIConfig
40
glog.V(4).Infof("About to add CNI network %v (type=%v)", netConf.Name, netConf.Plugins[0].Network.Type)
41
res, err := cniNet.AddNetworkList(netConf, rt)
42
if err != nil {
43
glog.Errorf("Error adding network: %v", err)
44
return nil, err
45
}
46
47
return res, nil
48
}
Copied!
    github.com/containernetworking/cni/libcni/api.go
1
// AddNetworkList executes a sequence of plugins with the ADD command
2
func (c *CNIConfig) AddNetworkList(list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) {
3
var prevResult types.Result
4
for _, net := range list.Plugins {
5
pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
6
if err != nil {
7
return nil, err
8
}
9
10
newConf, err := buildOneConfig(list, net, prevResult, rt)
11
if err != nil {
12
return nil, err
13
}
14
15
// 调用插件添加网络
16
prevResult, err = invoke.ExecPluginWithResult(pluginPath, newConf.Bytes, c.args("ADD", rt))
17
if err != nil {
18
return nil, err
19
}
20
}
21
22
return prevResult, nil
23
}
Copied!

删除网络

    pkg/kubelet/network/cni/cni.go
1
func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
2
if err := plugin.checkInitialized(); err != nil {
3
return err
4
}
5
6
// Lack of namespace should not be fatal on teardown
7
netnsPath, err := plugin.host.GetNetNS(id.ID)
8
if err != nil {
9
glog.Warningf("CNI failed to retrieve network namespace path: %v", err)
10
}
11
12
return plugin.deleteFromNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath)
13
}
14
... ...
15
16
func (plugin *cniNetworkPlugin) deleteFromNetwork(network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string) error {
17
rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath)
18
if err != nil {
19
glog.Errorf("Error deleting network when building cni runtime conf: %v", err)
20
return err
21
}
22
23
netConf, cniNet := network.NetworkConfig, network.CNIConfig
24
glog.V(4).Infof("About to del CNI network %v (type=%v)", netConf.Name, netConf.Plugins[0].Network.Type)
25
err = cniNet.DelNetworkList(netConf, rt)
26
if err != nil {
27
glog.Errorf("Error deleting network: %v", err)
28
return err
29
}
30
return nil
31
}
Copied!
    github.com/containernetworking/cni/libcni/api.go
1
// DelNetworkList executes a sequence of plugins with the DEL command
2
func (c *CNIConfig) DelNetworkList(list *NetworkConfigList, rt *RuntimeConf) error {
3
for i := len(list.Plugins) - 1; i >= 0; i-- {
4
net := list.Plugins[i]
5
6
pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path)
7
if err != nil {
8
return err
9
}
10
11
newConf, err := buildOneConfig(list, net, nil, rt)
12
if err != nil {
13
return err
14
}
15
16
// 调用插件删除网络
17
if err := invoke.ExecPluginWithoutResult(pluginPath, newConf.Bytes, c.args("DEL", rt)); err != nil {
18
return err
19
}
20
}
21
22
return nil
23
}
Copied!

参考

最近更新 11mo ago