国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Kubelet源碼分析(一):啟動流程分析

mindwind / 2942人閱讀

摘要:源碼版本簡介在急群眾,在每個節點上都會啟動一個服務進程。該進程用于處理節點下發到本節點的任務,管理及中的容器。每個進程會在上注冊節點自身信息,定期向節點匯報節點資源的使用情況,并通過監控容器和節點資源。最后運行健康檢測服務。

源碼版本

kubernetes version: v1.3.0

簡介

在Kubernetes急群眾,在每個Node節點上都會啟動一個kubelet服務進程。該進程用于處理Master節點下發到本節點的任務,管理Pod及Pod中的容器。每個Kubelet進程會在APIServer上注冊節點自身信息,定期向Master節點匯報節點資源的使用情況,并通過cAdvise監控容器和節點資源。

關鍵結構 KubeletConfiguration
type KubeletConfiguration struct {
    // kubelet的參數配置文件
    Config string `json:"config"`
    // kubelet支持三種源數據:
    // 1. ApiServer: kubelet通過ApiServer監聽etcd目錄,同步Pod清單
    // 2. file: 通過kubelet啟動參數"--config"指定配置文件目錄下的文件
    // 3. http URL: 通過"--manifest-url"參數設置
    // 所以下面會有三種同步的頻率配置
    // 同步容器和配置的頻率。
    SyncFrequency unversioned.Duration `json:"syncFrequency"`
    // 文件檢查頻率
    FileCheckFrequency unversioned.Duration `json:"fileCheckFrequency"`
    // Http模式檢查頻率
    HTTPCheckFrequency unversioned.Duration `json:"httpCheckFrequency"`
    // 該參數設置HTTP模式下的endpoint
    ManifestURL string `json:"manifestURL"`

    ManifestURLHeader string `json:"manifestURLHeader"`
    // 是否需要開啟kubelet Server,就是指下列的10250端口
    EnableServer bool `json:"enableServer"`
    // kubelet服務地址
    Address string `json:"address"`
    // kubelet服務端口,默認10250
    // 別的服務端口如下:
    // -->Scheduler服務端口:10251
    // -->ControllerManagerPort: 10252
    Port uint `json:"port"`

    // kubelet服務的只讀端口,沒有任何認證(0:disable)。默認為10255
    // 該功能只要配置端口,就必定開啟服務
    ReadOnlyPort uint `json:"readOnlyPort"`

    // 證書相關:
    TLSCertFile string `json:"tLSCertFile"`

    TLSPrivateKeyFile string `json:"tLSPrivateKeyFile"`

    CertDirectory string `json:"certDirectory"`
    // 用于識別kubelet的hostname,代替實際的hostname
    HostnameOverride string `json:"hostnameOverride"`
    // 指定創建Pod時的基礎鏡像
    PodInfraContainerImage string `json:"podInfraContainerImage"`
    // 配置kubelet需要交互的docker的endpoint
    // 比如:unix:///var/run/docker.sock, 這個是默認的Linux配置
    DockerEndpoint string `json:"dockerEndpoint"`
    // kubelet的volume、mounts、配置目錄路徑
    // 默認是/var/lib/kubelet
    RootDirectory string `json:"rootDirectory"`
    
    SeccompProfileRoot string `json:"seccompProfileRoot"`
    // 是否允許root權限
    AllowPrivileged bool `json:"allowPrivileged"`
    // kubelet允許pods使用的資源:主機的Network、PID、IPC
    // 默認都是kubetypes.AllSource,即所有資源"*"
    HostNetworkSources string `json:"hostNetworkSources"`

    HostPIDSources string `json:"hostPIDSources"`

    HostIPCSources string `json:"hostIPCSources"`
    // 限制從鏡像倉庫拉取鏡像的速度, 0:unlimited; 5.0: default
    RegistryPullQPS float64 `json:"registryPullQPS"`
    // 從鏡像倉庫拉取鏡像允許產生的爆發值
    RegistryBurst int32 `json:"registryBurst"`
    // 限制每秒產生的events最大數量
    EventRecordQPS float32 `json:"eventRecordQPS"`
    // 允許產生events的爆發值
    EventBurst int32 `json:"eventBurst"`
    // 使能debug模式,進行log收集和本地允許容器和命令
    EnableDebuggingHandlers bool `json:"enableDebuggingHandlers"`
    // 容器被回收之前存在的最小時間,在這時間之前是不允許被回收的
    MinimumGCAge unversioned.Duration `json:"minimumGCAge"`
    // Pod中允許存在Container的最大數量,默認是2
    MaxPerPodContainerCount int32 `json:"maxPerPodContainerCount"`
    // 該節點上允許存在的最大container數量,默認是240
    MaxContainerCount int32 `json:"maxContainerCount"`
    // cAdvisor服務端口,默認是4194
    CAdvisorPort uint `json:"cAdvisorPort"`
    // 健康檢測端口,默認是10248
    HealthzPort int32 `json:"healthzPort"`
    // 健康檢測綁定地址,默認是“127.0.0.1”
    HealthzBindAddress string `json:"healthzBindAddress"`
    // kubelet進程的oom-score-adj值,范圍:[-1000, 1000]
    OOMScoreAdj int32 `json:"oomScoreAdj"`
    // 是否自動向Apiserver注冊
    RegisterNode bool `json:"registerNode"`
    
    ClusterDomain string `json:"clusterDomain"`

    MasterServiceNamespace string `json:"masterServiceNamespace"`
    // 集群DNS的IP,kubelet將配置所有的containers去使用該DNS
    ClusterDNS string `json:"clusterDNS"`
    // 流連接的超時時間
    StreamingConnectionIdleTimeout unversioned.Duration `json:"streamingConnectionIdleTimeout"`
    // Node狀態更新頻率,該值需要和nodeController中的nodeMonitorGracePeriod一起作用
    // 設置kubelet每隔多少時間向APIServer匯報節點狀態,默認為10s
    NodeStatusUpdateFrequency unversioned.Duration `json:"nodeStatusUpdateFrequency"`
    // 設置鏡像被回收之前存在的最短時間,在這時間之前是不會被回收
    ImageMinimumGCAge unversioned.Duration `json:"imageMinimumGCAge"`
    // 磁盤占用率超過該值后,鏡像垃圾回收進程將一直運行
    ImageGCHighThresholdPercent int32 `json:"imageGCHighThresholdPercent"`
    // 磁盤占用率低于該值,鏡像垃圾回收進程將不運行
    ImageGCLowThresholdPercent int32 `json:"imageGCLowThresholdPercent"`
    // 磁盤空間的保留大小,當低于該值時,Pods將不能再創建
    LowDiskSpaceThresholdMB int32 `json:"lowDiskSpaceThresholdMB"`
    // 計算所有Pods和緩存容量的磁盤使用情況的頻率
    VolumeStatsAggPeriod unversioned.Duration `json:"volumeStatsAggPeriod"`
    // Network和volume的插件相關
    NetworkPluginName string `json:"networkPluginName"`

    NetworkPluginDir string `json:"networkPluginDir"`

    VolumePluginDir string `json:"volumePluginDir"`

    CloudProvider string `json:"cloudProvider,omitempty"`

    CloudConfigFile string `json:"cloudConfigFile,omitempty"`
    // 一個cgroups的名字,用于隔離kubelet   ????為啥要隔離?單節點支持多個kubelet??
    KubeletCgroups string `json:"kubeletCgroups,omitempty"`
    // 用于隔離容器運行時(Docker、Rkt)的cgroups
    RuntimeCgroups string `json:"runtimeCgroups,omitempty"`

    SystemCgroups string `json:"systemContainer,omitempty"`
    
    CgroupRoot string `json:"cgroupRoot,omitempty"`
    // ???
    ContainerRuntime string `json:"containerRuntime"`
    // 設置所有的runtime請求的超時時間(如:pull、logs、exec、attach),除了那些長時間運行的任務
    RuntimeRequestTimeout unversioned.Duration `json:"runtimeRequestTimeout,omitempty"`
    // rkt執行文件的路徑
    RktPath string `json:"rktPath,omitempty"`
    // rkt通訊端點
    RktAPIEndpoint string `json:"rktAPIEndpoint,omitempty"`
    
    RktStage1Image string `json:"rktStage1Image,omitempty"`
    // kubelet文件鎖,用于與別的kubelet進行同步
    LockFilePath string `json:"lockFilePath"`
    
    ExitOnLockContention bool `json:"exitOnLockContention"`
    // 基于Node.Spec.PodCIDR來配置網卡cbr0
    ConfigureCBR0 bool `json:"configureCbr0"`
    // 配置網絡模式, promiscuous-bridge、hairpin-veth、none
    HairpinMode string `json:"hairpinMode"`
    // 表示該節點已經有監控docker和kubelet的程序
    BabysitDaemons bool `json:"babysitDaemons"`
    // 該kubelet下能運行的最大Pods數量
    MaxPods int32 `json:"maxPods"`
    
    NvidiaGPUs int32 `json:"nvidiaGPUs"`
    // 容器命令執行的Handler,通過字符串來配置不同的Handler
    // 可配置:"native" or "nsender",default: "native"
    DockerExecHandlerName string `json:"dockerExecHandlerName"`
    // 這個CIDR用于分配Pod IP地址,只作用在standalone模式
    PodCIDR string `json:"podCIDR"`
    // 配置容器的DNS解析文件,默認是"/etc/resolv.conf"
    ResolverConfig string `json:"resolvConf"`
    // 使能容器的CPU配額功能
    CPUCFSQuota bool `json:"cpuCFSQuota"`
    // 如果kubelet運行在容器中的話,需要把該值設置為true
    // kubelet運行在主機上和容器里會有差異:
    // 在主機上的話,寫文件數據沒有什么限制,直接調用ioutil.WriteFile()接口就OK
    // 在容器里的話,如果kubelet要寫數據到它所創建的容器的話,就得使用nsender進入到
    // 容器對應的namespace中,然后寫數據
    Containerized bool `json:"containerized"`
    // kubelet進程可以打開的最大文件數
    MaxOpenFiles uint64 `json:"maxOpenFiles"`
    // 由apiServer指定CIDR
    ReconcileCIDR bool `json:"reconcileCIDR"`
    // 指定kubelet將它所在的Node注冊到Apiserver,為Schedulable
    RegisterSchedulable bool `json:"registerSchedulable"`
    // kubelet發送給apiServer的請求的正文類型,default:"application/vnd.kubernetes.protobuf"
    ContentType string `json:"contentType"`
    // kubelet和apiServer交互所設定的QPS
    KubeAPIQPS float32 `json:"kubeAPIQPS"`
    // kubelet與apiServer交互允許產生的爆發值
    KubeAPIBurst int32 `json:"kubeAPIBurst"`
    // 設置為true的話,告訴kubelet串行的去pull image
    SerializeImagePulls bool `json:"serializeImagePulls"`
    // 使能Flannel網絡來啟動kubelet,該前提是默認Flannel已經啟動了
    ExperimentalFlannelOverlay bool `json:"experimentalFlannelOverlay"`
    // Node可能會出于out-of-disk的狀態(磁盤空間不足),kubelet需要定時查詢node狀態
    // 所以該值就是定時查詢的頻率
    OutOfDiskTransitionFrequency unversioned.Duration `json:"outOfDiskTransitionFrequency,omitempty"`
    // kubelet所在節點的IP.如果該值有設置,那么kubelet會把該值設置到node上
    NodeIP string `json:"nodeIP,omitempty"`
    // 該Node的Labels
    NodeLabels map[string]string `json:"nodeLabels"`

    NonMasqueradeCIDR string `json:"nonMasqueradeCIDR"`
    
    EnableCustomMetrics bool `json:"enableCustomMetrics"`
    // 以下幾個都跟回收策略有關,詳細的需要查看代碼實現。
    // 用逗號分隔的回收資源的條件表達式
    // 參考: https://kubernetes.io/docs/admin/out-of-resource/
    EvictionHard string `json:"evictionHard,omitempty"`
    
    EvictionSoft string `json:"evictionSoft,omitempty"`
    
    EvictionSoftGracePeriod string `json:"evictionSoftGracePeriod,omitempty"`

    EvictionPressureTransitionPeriod unversioned.Duration `json:"evictionPressureTransitionPeriod,omitempty"`
    
    EvictionMaxPodGracePeriod int32 `json:"evictionMaxPodGracePeriod,omitempty"`
    // 設置每個核最大的Pods數量
    PodsPerCore int32 `json:"podsPerCore"`
    // 是否使能kubelet attach/detach的功能
    EnableControllerAttachDetach bool `json:"enableControllerAttachDetach"`
}
Kubelet啟動流程 main 入口

main入口: cmd/kubelet/kubelet.go
Main源碼如下:

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    s := options.NewKubeletServer()
    s.AddFlags(pflag.CommandLine)

    flag.InitFlags()
    util.InitLogs()
    defer util.FlushLogs()

    verflag.PrintAndExitIfRequested()

    if err := app.Run(s, nil); err != nil {
        fmt.Fprintf(os.Stderr, "%v
", err)
        os.Exit(1)
    }
}

有看過源碼的同學,應該會發現kubernetes所有執行程序的入口函數風格都差不多一致。
options.NewKubeletServer(): 創建了一個KubeletServer結構,并進行了默認值的初始化。
接口如下:

func NewKubeletServer() *KubeletServer {
    return &KubeletServer{
...
        KubeletConfiguration: componentconfig.KubeletConfiguration{
            Address:                      "0.0.0.0",
            CAdvisorPort:                 4194,
            VolumeStatsAggPeriod:         unversioned.Duration{Duration: time.Minute},
            CertDirectory:                "/var/run/kubernetes",
            CgroupRoot:                   "",
            CloudProvider:                AutoDetectCloudProvider,
            ConfigureCBR0:                false,
            ContainerRuntime:             "docker",
            RuntimeRequestTimeout:        unversioned.Duration{Duration: 2 * time.Minute},
            CPUCFSQuota:                  true,
...
}

s.AddFlags(pflag.CommandLine): 該接口用于從kubelet命令行獲取參數。
接口如下:

func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
    fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
    fs.DurationVar(&s.SyncFrequency.Duration, "sync-frequency", s.SyncFrequency.Duration, "Max period between synchronizing running containers and config")
    fs.DurationVar(&s.FileCheckFrequency.Duration, "file-check-frequency", s.FileCheckFrequency.Duration, "Duration between checking config files for new data")
...
}

命令行參數獲取完之后,就是進行日志等的初始化。
verflag.PrintAndExitIfRequested(): 判斷了參數是否是help,是的話直接打印help信息,然后退出。
最后就進入到關鍵函數app.Run(s, nil)。

app.Run()

Run入口: cmd/kubelet/app/server.go
該接口的代碼很長,其實主要也是做了一些準備工作,先來看下參數配置的過程。
代碼如下:

func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
...
    // 可以看到app.Run()進來的時候,kcfg=nil
    if kcfg == nil {
        // UnsecuredKubeletConfig()返回一個有效的KubeConfig
        cfg, err := UnsecuredKubeletConfig(s)
        if err != nil {
            return err
        }
        kcfg = cfg
        // 初始化一個Config,用來與APIServer交互
        clientConfig, err := CreateAPIServerClientConfig(s)
        if err == nil {
            // 用于創建各類client: 核心client、認證client、授權client...
            kcfg.KubeClient, err = clientset.NewForConfig(clientConfig)
            // 創建一個events的client
            // make a separate client for events
            eventClientConfig := *clientConfig
            eventClientConfig.QPS = s.EventRecordQPS
            eventClientConfig.Burst = int(s.EventBurst)
            kcfg.EventClient, err = clientset.NewForConfig(&eventClientConfig)
        }
...
    }

    // 創建了一個cAdvisor對象,用于獲取各類資源信息
    // 其中有部分接口還未支持
    if kcfg.CAdvisorInterface == nil {
        kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort, kcfg.ContainerRuntime)
        if err != nil {
            return err
        }
    }
    // kubelet的容器管理模塊
    if kcfg.ContainerManager == nil {
        if kcfg.SystemCgroups != "" && kcfg.CgroupRoot == "" {
            return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
        }

        kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface, cm.NodeConfig{
            RuntimeCgroupsName: kcfg.RuntimeCgroups,
            SystemCgroupsName:  kcfg.SystemCgroups,
            KubeletCgroupsName: kcfg.KubeletCgroups,
            ContainerRuntime:   kcfg.ContainerRuntime,
        })
        if err != nil {
            return err
        }
    }
...
    // 配置系統OOM參數
    // TODO(vmarmol): Do this through container config.
    oomAdjuster := kcfg.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
        glog.Warning(err)
    }

    // 繼續接下去的kubelet運行步驟
    if err := RunKubelet(kcfg); err != nil {
        return err
    }

    // kubelet的監控檢測
    if s.HealthzPort > 0 {
        healthz.DefaultHealthz()
        go wait.Until(func() {
            err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
            if err != nil {
                glog.Errorf("Starting health server failed: %v", err)
            }
        }, 5*time.Second, wait.NeverStop)
    }

    if s.RunOnce {
        return nil
    }

    <-done
    return nil
}

該接口主要準備了一個KubeletConfig結構,調用UnsecuredKubeletConfig()接口進行創建。
然后還創建了一些該結構中的kubeClient、EventClient、CAdvisorInterface、ContainerManager、oomAdjuster等對象。
然后調用了RunKubelet()接口,走接下去的服務運行流程。
最后運行健康檢測服務。

下面挑關鍵的接口進行介紹:

UnsecuredKubeletConfig()接口
func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
。。。
    // kubelet可能會以容器的方式部署,需要配置標準輸出
    mounter := mount.New()
    var writer io.Writer = &io.StdWriter{}
    if s.Containerized {
        glog.V(2).Info("Running kubelet in containerized mode (experimental)")
        mounter = mount.NewNsenterMounter()
        writer = &io.NsenterWriter{}
    }

    // 配置kubelet的TLS
    tlsOptions, err := InitializeTLS(s)
    if err != nil {
        return nil, err
    }
    
    // kubelet有兩種部署方式: 直接運行在物理機上,還有一種是通過容器部署。
    // 若部署到容器中,就會有namespace隔離的問題,導致kubelet無法訪問docker容器的
    // namespace并且docker exec運行命令。
    // 所以這里會進行判斷,如果運行在容器中的話,就需要用到nsenter,它可以協助kubelet
    // 到指定的namespace運行命令。
    // nsenter參考資料: https://github.com/jpetazzo/nsenter
    var dockerExecHandler dockertools.ExecHandler
    switch s.DockerExecHandlerName {
    case "native":
        dockerExecHandler = &dockertools.NativeExecHandler{}
    case "nsenter":
        dockerExecHandler = &dockertools.NsenterExecHandler{}
    default:
        glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
        dockerExecHandler = &dockertools.NativeExecHandler{}
    }
    
    // k8s對image的回收管理策略
    // MinAge: 表示鏡像存活的最小時間,只有在這之后才能回收該鏡像
    // HighThresholdPercent: 磁盤占用超過該值后,GC一直開啟
    // LowThresholdPercent: 磁盤占用低于該值的話,GC不開啟
    imageGCPolicy := kubelet.ImageGCPolicy{
        MinAge:               s.ImageMinimumGCAge.Duration,
        HighThresholdPercent: int(s.ImageGCHighThresholdPercent),
        LowThresholdPercent:  int(s.ImageGCLowThresholdPercent),
    }
    // k8s根據磁盤空間配置策略
    // DockerFreeDiskMB: 磁盤可用空間低于該值時,pod將無法再在該節點創建,也是指該磁盤需要保留的空間大小
    diskSpacePolicy := kubelet.DiskSpacePolicy{
        DockerFreeDiskMB: int(s.LowDiskSpaceThresholdMB),
        RootFreeDiskMB:   int(s.LowDiskSpaceThresholdMB),
    }

。。。
    // k8s v1.3引入的功能。Eviction用于k8s集群提前感知節點memory/disk負載情況,來調度資源。
    thresholds, err := eviction.ParseThresholdConfig(s.EvictionHard, s.EvictionSoft, s.EvictionSoftGracePeriod)
    if err != nil {
        return nil, err
    }
    evictionConfig := eviction.Config{
        PressureTransitionPeriod: s.EvictionPressureTransitionPeriod.Duration,
        MaxPodGracePeriodSeconds: int64(s.EvictionMaxPodGracePeriod),
        Thresholds:               thresholds,
    }
    // 初始化KubeletConfig結構
    return &KubeletConfig{
        Address:                      net.ParseIP(s.Address),
        AllowPrivileged:              s.AllowPrivileged,
        Auth:                         nil, // default does not enforce auth[nz]
。。。
    }, nil
}

這段代碼中,個人覺得有幾個點比較值得了解下:

該接口中會涉及到kubelet跑在物理機上還是容器中。
如果運行在容器中,會存在namespace權限的問題,需要通過nsenter來操作docker容器。

kubelet提供了參數"--docker-exec-handler"(即DockerExecHandlerName),來配置是否使用nsenter.
Nsenter功能可以了解下。

還有一個kubelet Eviction功能。該功能是k8s v1.3.0新引入的功能,eviction功能就是在節點超負荷之前,提前不讓Pod進行創建,主要就是針對memory和disk。
之前的版本是不會提前感知集群的節點負荷,當內存吃緊時,k8s只依靠內核的OOM Killer、磁盤定期對image和container進行垃圾回收功能,這樣對于Pod有不確定性。eviction很好的解決了該問題,可以在kubelet啟動時指定memory/disk等參數,來保證節點穩定工作,讓集群提前感知節點負荷。

根據kubeconfig創建client

創建client會有兩步:

調用CreateAPIServerClientConfig()進行Config初始化

調用clientset.NewForConfig()根據之前初始化的Config,創建各類Client。

CreateAPIServerClientConfig()接口如下:

func CreateAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) {    
    // 檢查APIServer是否有配置
    if len(s.APIServerList) < 1 {
        return nil, fmt.Errorf("no api servers specified")
    }
    // 檢查是否配置了多個APIServer,新版本已經支持多APIServer的HA
    // 現在默認是用第一個Server
    // TODO: adapt Kube client to support LB over several servers
    if len(s.APIServerList) > 1 {
        glog.Infof("Multiple api servers specified.  Picking first one")
    }

    clientConfig, err := createClientConfig(s)
    if err != nil {
        return nil, err
    }

    clientConfig.ContentType = s.ContentType
    // Override kubeconfig qps/burst settings from flags
    clientConfig.QPS = s.KubeAPIQPS
    clientConfig.Burst = int(s.KubeAPIBurst)

    addChaosToClientConfig(s, clientConfig)
    return clientConfig, nil
}

func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
    if s.KubeConfig.Provided() && s.AuthPath.Provided() {
        return nil, fmt.Errorf("cannot specify both --kubeconfig and --auth-path")
    }
    if s.KubeConfig.Provided() {
        return kubeconfigClientConfig(s)
    }
    if s.AuthPath.Provided() {
        return authPathClientConfig(s, false)
    }
    // Try the kubeconfig default first, falling back to the auth path default.
    clientConfig, err := kubeconfigClientConfig(s)
    if err != nil {
        glog.Warningf("Could not load kubeconfig file %s: %v. Trying auth path instead.", s.KubeConfig, err)
        return authPathClientConfig(s, true)
    }
    return clientConfig, nil
}

// 就是這邊默認指定了第一個APIServer
func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
    return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
        &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
        &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.APIServerList[0]}}).ClientConfig()
}

創建Config成功之后,便調用clientset.NewForConfig()創建各類Clients:

func NewForConfig(c *restclient.Config) (*Clientset, error) {
    // 配置Client連接限制
    configShallowCopy := *c
    if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
        configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
    }
    var clientset Clientset
    var err error
    // 創建核心Client
    clientset.CoreClient, err = unversionedcore.NewForConfig(&configShallowCopy)
    if err != nil {
        return nil, err
    }
    // 創建第三方Client
    clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(&configShallowCopy)
    if err != nil {
        return nil, err
    }
    // 創建自動伸縮Client
    clientset.AutoscalingClient, err = unversionedautoscaling.NewForConfig(&configShallowCopy)
    if err != nil {
        return nil, err
    }
    // 創建批量操作的Client
    clientset.BatchClient, err = unversionedbatch.NewForConfig(&configShallowCopy)
    if err != nil {
        return nil, err
    }
    // 創建Rbac Client (RBAC:基于角色的訪問控制)
    // 跟k8s的認證授權有關,可以參考: https://kubernetes.io/docs/admin/authorization/
    clientset.RbacClient, err = unversionedrbac.NewForConfig(&configShallowCopy)
    if err != nil {
        return nil, err
    }
    // 創建服務發現Client
    clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
    if err != nil {
        glog.Errorf("failed to create the DiscoveryClient: %v", err)
        return nil, err
    }
    return &clientset, nil
}

上面的各種客戶端實際就是api rest請求的客戶端。

RunKubelet

上面的各類創建及初始化完之后,便進入下一步驟RunKubelet:

func RunKubelet(kcfg *KubeletConfig) error {
...
    // k8s event對象創建,用于kubelet向APIServer發送管理容器相關的各類events
    // 后面會多帶帶介紹k8s events功能,這里不再展開細講
    eventBroadcaster := record.NewBroadcaster()
    kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName})
    eventBroadcaster.StartLogging(glog.V(3).Infof)
    if kcfg.EventClient != nil {
        glog.V(4).Infof("Sending events to api server.")
        eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kcfg.EventClient.Events("")})
    } else {
        glog.Warning("No api server defined - no events will be sent to API server.")
    }
    // 配置capabilities
    privilegedSources := capabilities.PrivilegedSources{
        HostNetworkSources: kcfg.HostNetworkSources,
        HostPIDSources:     kcfg.HostPIDSources,
        HostIPCSources:     kcfg.HostIPCSources,
    }
    capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0)

    credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
    // 調用CreateAndInitKubelet()接口,進行各類初始化
    builder := kcfg.Builder
    if builder == nil {
        builder = CreateAndInitKubelet
    }
    if kcfg.OSInterface == nil {
        kcfg.OSInterface = kubecontainer.RealOS{}
    }
    k, podCfg, err := builder(kcfg)
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %v", err)
    }
    // 設置kubelet進程自身最大能打開的文件句柄數
    util.ApplyRLimitForSelf(kcfg.MaxOpenFiles)

    // TODO(dawnchen): remove this once we deprecated old debian containervm images.
    // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726
    // The current chosen number is consistent with most of other os dist.
    const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys"
    const minKeys uint64 = 1000000
    key, err := ioutil.ReadFile(maxkeysPath)
    if err != nil {
        glog.Errorf("Cannot read keys quota in %s", maxkeysPath)
    } else {
        fields := strings.Fields(string(key))
        nkey, _ := strconv.ParseUint(fields[0], 10, 64)
        if nkey < minKeys {
            glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys)
            err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644)
            if err != nil {
                glog.Warningf("Failed to update %s: %v", maxkeysPath, err)
            }
        }
    }
    const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes"
    const minBytes uint64 = 25000000
    bytes, err := ioutil.ReadFile(maxbytesPath)
    if err != nil {
        glog.Errorf("Cannot read keys bytes in %s", maxbytesPath)
    } else {
        fields := strings.Fields(string(bytes))
        nbyte, _ := strconv.ParseUint(fields[0], 10, 64)
        if nbyte < minBytes {
            glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes)
            err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644)
            if err != nil {
                glog.Warningf("Failed to update %s: %v", maxbytesPath, err)
            }
        }
    }

    // kubelet可以只運行一次,也可以作為一個后臺daemon一直運行
    // 一次運行的話,就是Runonce,處理下pods事件然后退出
    // 一直運行的話,就是startKubelet()
    // process pods and exit.
    if kcfg.Runonce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        glog.Infof("Started kubelet %s as runonce", version.Get().String())
    } else {
        // 進入關鍵函數startKubelet()
        startKubelet(k, podCfg, kcfg)
        glog.Infof("Started kubelet %s", version.Get().String())
    }
    return nil
}

該接口中會調用CreateAndInitKubelet()接口再進行初始化,其中又調用了kubelet.NewMainKubelet()接口。
kubelet可以只運行一次,也可以后臺一直運行。要一直運行的話就是調用startKubelet()。
我們先看下初始化接口干了些什么?

func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
    // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
    // up into "per source" synchronizations
    // TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
    // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
    // a nil pointer to it when what we really want is a nil interface.
    var kubeClient clientset.Interface
    if kc.KubeClient != nil {
        kubeClient = kc.KubeClient
        // TODO: remove this when we"ve refactored kubelet to only use clientset.
    }

    // 初始化container GC參數
    gcPolicy := kubecontainer.ContainerGCPolicy{
        MinAge:             kc.MinimumGCAge,
        MaxPerPodContainer: kc.MaxPerPodContainerCount,
        MaxContainers:      kc.MaxContainerCount,
    }

    // 配置kubelet server的端口, default: 10250
    daemonEndpoints := &api.NodeDaemonEndpoints{
        KubeletEndpoint: api.DaemonEndpoint{Port: int32(kc.Port)},
    }

    // 創建PodConfig
    pc = kc.PodConfig
    if pc == nil {
        // kubelet支持三種數據源: file、HTTP URL、k8s APIServer
        // 默認是k8s APIServer,這里還會涉及到cache,可以深入學習下具體實現
        pc = makePodSourceConfig(kc)
    }
    // 
    k, err = kubelet.NewMainKubelet(
        kc.Hostname,
        kc.NodeName,
        kc.DockerClient,
        kubeClient,
。。。
    )

    if err != nil {
        return nil, nil, err
    }

    k.BirthCry()

    k.StartGarbageCollection()

    return k, pc, nil
}

初始化接口中還有一層調用:kubelet.NewMainKubelet(),該接口在1.3中是N多參數,并且函數實現也是很長很長,寫的非常不友好,不過看了下新版本已經重寫過了。我們還是拿這個又長又胖的接口,繼續了解下:

func NewMainKubelet(
    hostname string,
    nodeName string,
。。。
) (*Kubelet, error) {
。。。
    // 創建service的cache.NewStore, 設置service的監聽函數listWatch,并設置對應的反射NewReflector,然后設置serviceLister
    serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
    if kubeClient != nil {
        // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
        // than an interface. There is no way to construct a list+watcher using resource name.
        listWatch := &cache.ListWatch{
            ListFunc: func(options api.ListOptions) (runtime.Object, error) {
                return kubeClient.Core().Services(api.NamespaceAll).List(options)
            },
            WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
                return kubeClient.Core().Services(api.NamespaceAll).Watch(options)
            },
        }
        cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
    }
    serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
    
    // 創建node的cache.NewStore, 設置fieldSelector,設置監聽函數listWatch,設置對應的反射NewReflector,并設置nodeLister,nodeInfo和nodeRef
    nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
    if kubeClient != nil {
        // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
        // than an interface. There is no way to construct a list+watcher using resource name.
        fieldSelector := fields.Set{api.ObjectNameField: nodeName}.AsSelector()
        listWatch := &cache.ListWatch{
            ListFunc: func(options api.ListOptions) (runtime.Object, error) {
                options.FieldSelector = fieldSelector
                return kubeClient.Core().Nodes().List(options)
            },
            WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
                options.FieldSelector = fieldSelector
                return kubeClient.Core().Nodes().Watch(options)
            },
        }
        cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
    }
    nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
    nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}

    // TODO: get the real node object of ourself,
    // and use the real node name and UID.
    // TODO: what is namespace for node?
    nodeRef := &api.ObjectReference{
        Kind:      "Node",
        Name:      nodeName,
        UID:       types.UID(nodeName),
        Namespace: "",
    }
    // 創建磁盤空間管理對象,該對象需要使用cAdvisor的接口來獲取磁盤相關信息
    // 最后一個參數便是配置磁盤管理的Policy
    diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
    }
    // 創建一個空的container reference manager對象
    containerRefManager := kubecontainer.NewRefManager()
    // 創建OOM 監控對象,使用cAdvisor接口監控內存,并使用event recorder上報oom事件
    oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)

    // TODO: remove when internal cbr0 implementation gets removed in favor
    // of the kubenet network plugin
    if networkPluginName == "kubenet" {
        configureCBR0 = false
        flannelExperimentalOverlay = false
    }
    // 初始化Kubelet
    klet := &Kubelet{
        hostname:                       hostname,
        nodeName:                       nodeName,
    。。。
    }

...

    procFs := procfs.NewProcFS()
    imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)

    klet.livenessManager = proberesults.NewManager()
    // 初始化pod的cache和manager對象
    klet.podCache = kubecontainer.NewCache()
    klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))

    // 初始化Docker container Runtime
    switch containerRuntime {
    case "docker":
        // dockerClient就是之后會介紹,就是kubelet用于操作docker的client
        // recorder: 即之前創建的event recorder
        // 還會有各類物理機信息,pull images的QPS等等參數
        // 具體可以了解下DockerManager結構
        // Only supported one for now, continue.
        klet.containerRuntime = dockertools.NewDockerManager(
            dockerClient,
            kubecontainer.FilterEventRecorder(recorder),
            klet.livenessManager,
            containerRefManager,
            klet.podManager,
            machineInfo,
            podInfraContainerImage,
            pullQPS,
            pullBurst,
            containerLogsDir,
            osInterface,
            klet.networkPlugin,
            klet,
            klet.httpClient,
            dockerExecHandler,
            oomAdjuster,
            procFs,
            klet.cpuCFSQuota,
            imageBackOff,
            serializeImagePulls,
            enableCustomMetrics,
            klet.hairpinMode == componentconfig.HairpinVeth,
            seccompProfileRoot,
            containerRuntimeOptions...,
        )
    case "rkt":
        ...
    default:
        return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
    }

    ...

    // 設置containerGC
    containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
    if err != nil {
        return nil, err
    }
    klet.containerGC = containerGC

    // 設置imageManager
    imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize image manager: %v", err)
    }
    klet.imageManager = imageManager

    klet.runner = klet.containerRuntime
    // 設置statusManager
    klet.statusManager = status.NewManager(kubeClient, klet.podManager)
    // 設置probeManager
    klet.probeManager = prober.NewManager(
        klet.statusManager,
        klet.livenessManager,
        klet.runner,
        containerRefManager,
        recorder)

    klet.volumePluginMgr, err =
        NewInitializedVolumePluginMgr(klet, volumePlugins)
    if err != nil {
        return nil, err
    }
    // 設置volumeManager
    klet.volumeManager, err = kubeletvolume.NewVolumeManager(
        enableControllerAttachDetach,
        hostname,
        klet.podManager,
        klet.kubeClient,
        klet.volumePluginMgr,
        klet.containerRuntime)

    // 創建runtime Cache對象
    runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
    if err != nil {
        return nil, err
    }
    klet.runtimeCache = runtimeCache
    klet.reasonCache = NewReasonCache()
    klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
    // 創建podWorkers對象,這個比較關鍵,后面會多帶帶介紹
    klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

    klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
    klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
    klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()

    // 設置eviction manager
    evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), recorder, nodeRef, klet.clock)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
    }
    klet.evictionManager = evictionManager
    klet.AddPodAdmitHandler(evictionAdmitHandler)

    // apply functional Option"s
    for _, opt := range kubeOptions {
        opt(klet)
    }
    return klet, nil
}

該接口中,會創建podWorkers,該對象比較重要,跟pod的實際操作有關,后面會多帶帶進行介紹。這里先只點到為止。
我們回想下整個流程就會發現,cmd/kubelet/app主要就是做一些簡單的參數處理,具體的初始化都是在pkg/kubelet中做的。
看完初始化,我們要進入真正運行的接口startKubelet():

func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
    // 這里是真正的啟動kubelet
    go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

    // 這里是開啟kubelet Server,便于調用kubelet的API進行操作
    if kc.EnableServer {
        go wait.Until(func() {
            k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)
        }, 0, wait.NeverStop)
    }
    // 該處是開啟kubelet的只讀服務,端口是10255
    if kc.ReadOnlyPort > 0 {
        go wait.Until(func() {
            k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
        }, 0, wait.NeverStop)
    }
}

繼續深入,進入到真正啟動kubelet的接口k.Run(),這個里的k是個KubeletBootstrap類型的interface,實際對象是由CreateAndInitKubelet()接口返回的Kubelet對象,所以Run()實現可以查看該對象的實現。
具體實現路徑:pkg/kubelet/kubelet.go,接口如下:

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    // 開啟日志服務
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    if kl.kubeClient == nil {
        glog.Warning("No api server defined - no node status update will be sent.")
    }
    // init modulers,如imageManager、containerManager、oomWathcer、resourceAnalyzer
    if err := kl.initializeModules(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, kubecontainer.KubeletSetupFailed, err.Error())
        glog.Error(err)
        kl.runtimeState.setInitError(err)
    }

    // Start volume manager
    go kl.volumeManager.Run(wait.NeverStop)

    // 起協程,定時向APIServer更新node status
    if kl.kubeClient != nil {
        // Start syncing node status immediately, this may set up things the runtime needs to run.
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    }
    // 起協程,定時同步網絡狀態
    go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    // 起協程,定時處理那些被killing pods
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // Start component sync loops.
    kl.statusManager.Start()
    kl.probeManager.Start()
    // 啟動evictionManager
    kl.evictionManager.Start(kl.getActivePods, evictionMonitoringPeriod)

    // Start the pod lifecycle event generator.
    kl.pleg.Start()
    // 開啟pods事件,用于處理APIServer下發的任務,updates是一個管道
    kl.syncLoop(updates, kl)
}

func (kl *Kubelet) initializeModules() error {
    // Step 1: Promethues metrics.
    metrics.Register(kl.runtimeCache)

    // Step 2: Setup filesystem directories.
    if err := kl.setupDataDirs(); err != nil {
        return err
    }

    // Step 3: If the container logs directory does not exist, create it.
    if _, err := os.Stat(containerLogsDir); err != nil {
        if err := kl.os.MkdirAll(containerLogsDir, 0755); err != nil {
            glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
        }
    }

    // Step 4: Start the image manager.
    if err := kl.imageManager.Start(); err != nil {
        return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
    }

    // Step 5: Start container manager.
    if err := kl.containerManager.Start(); err != nil {
        return fmt.Errorf("Failed to start ContainerManager %v", err)
    }

    // Step 6: Start out of memory watcher.
    if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
        return fmt.Errorf("Failed to start OOM watcher %v", err)
    }

    // Step 7: Start resource analyzer
    kl.resourceAnalyzer.Start()

    return nil
}

到這里基本就結束了,學習源碼的過程中會發現很多點值得深入研究,比如:

dockerclient

podWorkers

podManager

cAdvisor

containerGC

imageManager

diskSpaceManager

statusManager

volumeManager

containerRuntime

kubelet cache

events recorder

Eviction Manager

kubelet如何收到APIServer任務,創建pod的流程

等等。。

后面會繼續挑一些關鍵點進行分析。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/32540.html

相關文章

  • Kubelet源碼分析(三):Garbage Collection

    摘要:源碼版本介紹在分析啟動流程時,老是會碰到各類,這里單獨提出來做下較詳細的分析。主要由兩部分組成使用指定的回收策略,刪除那些已經結束的所有的生命周期管理就是通過來實現的,其實該也是依賴了。相關配置該值表示磁盤占用率達到該值后會觸發。 源碼版本 kubernetes version: v1.3.0 kubelet GC介紹 在分析kubelet啟動流程時,老是會碰到各類GC,這里單獨提出來...

    siberiawolf 評論0 收藏0
  • Kubernetes監控之Heapster源碼分析

    摘要:源碼版本簡介是下的一個監控項目,用于進行容器集群的監控和性能分析?;镜墓δ芗案拍罱榻B可以回顧我之前的一篇文章監控之介紹。在源碼分析之前我們先介紹的實現流程,由上圖可以看出會從各個上獲取相關的監控信息,然后進行匯總發送給后臺數據庫。 源碼版本 heapster version: release-1.2 簡介 Heapster是Kubernetes下的一個監控項目,用于進行容器集群的監控...

    gclove 評論0 收藏0
  • kubeadm源碼分析(kubernetes離線安裝包,三步安裝)

    摘要:離線安裝包三步安裝,簡單到難以置信源碼分析說句實在話,的代碼寫的真心一般,質量不是很高。然后給該租戶綁定角色。 k8s離線安裝包 三步安裝,簡單到難以置信 kubeadm源碼分析 說句實在話,kubeadm的代碼寫的真心一般,質量不是很高。 幾個關鍵點來先說一下kubeadm干的幾個核心的事: kubeadm 生成證書在/etc/kubernetes/pki目錄下 kubeadm 生...

    Eirunye 評論0 收藏0
  • kubeadm源碼分析(kubernetes離線安裝包,三步安裝)

    摘要:離線安裝包三步安裝,簡單到難以置信源碼分析說句實在話,的代碼寫的真心一般,質量不是很高。然后給該租戶綁定角色。 k8s離線安裝包 三步安裝,簡單到難以置信 kubeadm源碼分析 說句實在話,kubeadm的代碼寫的真心一般,質量不是很高。 幾個關鍵點來先說一下kubeadm干的幾個核心的事: kubeadm 生成證書在/etc/kubernetes/pki目錄下 kubeadm 生...

    Heier 評論0 收藏0

發表評論

0條評論

mindwind

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<