国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Kubelet源碼分析(二): DockerClient

李世贊 / 3118人閱讀

摘要:接口分析源碼目錄實現的接口如下可以看到結構體實現了所有的接口。這些接口其實是對的操作接口進行了封裝,下面取一個接口進行分析該接口的關鍵就是調用了所以關鍵對象還是,繼續回到上面講初始化時介紹到的結構體。

源碼版本

kubernetes version: v1.3.0

DockerClient初始化

DockerClient是KubeletConfig的成員之一。
KubeletConfig結構介紹:

type KubeletConfig struct {
    Address                        net.IP
    AllowPrivileged                bool
...
    DockerClient                   dockertools.DockerInterface
    RuntimeCgroups                 string
    DockerExecHandler              dockertools.ExecHandler
...
}

而kubeletConfig的初始化是在UnsecuredKubeletConfig()接口中進行的,需要依賴最開始組建的kubeletServer配置結構,該kubeletServer結構中有DockerEndpoint字符串成員:

type KubeletServer struct {
    componentconfig.KubeletConfiguration

    AuthPath      util.StringFlag // Deprecated -- use KubeConfig instead
    KubeConfig    util.StringFlag
    APIServerList []string

    RunOnce bool

    // Insert a probability of random errors during calls to the master.
    ChaosChance float64
    // Crash immediately, rather than eating panics.
    ReallyCrashForTesting bool
    SystemReserved        config.ConfigurationMap
    KubeReserved          config.ConfigurationMap
}

type KubeletConfiguration struct {
    // config is the path to the config file or directory of files
    Config string `json:"config"`
...
    DockerEndpoint string `json:"dockerEndpoint"`
...    

實際上如果沒有指定該參數的話,會默認使用端點"unix:///var/run/docker.sock"做為DockerEndpoint。可以查看NewEnvClient()接口。

回到kubeletConfig的初始化接口UnsecuredKubeletConfig():

func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
    hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
    if err != nil {
        return nil, err
    }
...
    return &KubeletConfig{
        Address:                      net.ParseIP(s.Address),
        AllowPrivileged:              s.AllowPrivileged,
        ...
        DockerClient:                 dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
...
    }

接著繼續查看dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration)。

func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
    if dockerEndpoint == "fake://" {
        return NewFakeDockerClient()
    }
    client, err := getDockerClient(dockerEndpoint)
    if err != nil {
        glog.Fatalf("Couldn"t connect to docker: %v", err)
    }
    glog.Infof("Start docker client with request timeout=%v", requestTimeout)
    return newKubeDockerClient(client, requestTimeout)
}

先前我們了解了如果在kubelet啟動時沒有傳入"docker-endpoint"參數的話,s.DockerEndpoint即為空。
s.RuntimeRequestTimeout.Duration值可以查看NewKubeletServer()函數的初始化,是2min。
getDockerClient()接口比較簡單:
getDockerClient --> dockerapi.NewEnvClient() --> NewClient().
NewClient()接口如下:

func NewClient(host string, version string, client *http.Client, httpHeaders map[string]string) (*Client, error) {
    proto, addr, basePath, err := ParseHost(host)
    if err != nil {
        return nil, err
    }

    transport, err := transport.NewTransportWithHTTP(proto, addr, client)
    if err != nil {
        return nil, err
    }

    return &Client{
        proto:             proto,
        addr:              addr,
        basePath:          basePath,
        transport:         transport,
        version:           version,
        customHTTPHeaders: httpHeaders,
    }, nil
}

之前講了如果沒有傳入"docker-endpoint"參數的話,默認值就是"unix:///var/run/docker.sock".即host參數為該值。
ParseHost()先根據host進行解析,然后創建transport-->Client。
Client結構如下:

type Client struct {
    // proto holds the client protocol i.e. unix.
    proto string
    // addr holds the client address.
    addr string
    // basePath holds the path to prepend to the requests.
    basePath string
    // transport is the interface to send request with, it implements transport.Client.
    transport transport.Client
    // version of the server to talk to.
    version string
    // custom http headers configured by users.
    customHTTPHeaders map[string]string
}

創建Client成功之后,最終開始提到的ConnectToDockerOrDie()接口會調用newKubeDockerClient()生成pkg/kubelet/dockertools/kube_docker_client.go里的kubeDockerClient結構:

type kubeDockerClient struct {
    // timeout is the timeout of short running docker operations.
    timeout time.Duration
    client  *dockerapi.Client
}

初始化到這里就結束了,那我們回到最初,介紹下DockerClient定義:
dockertools.DockerInterface如下:

type DockerInterface interface {
    ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
    InspectContainer(id string) (*dockertypes.ContainerJSON, error)
    CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
    StartContainer(id string) error
    StopContainer(id string, timeout int) error
    RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
    InspectImage(image string) (*dockertypes.ImageInspect, error)
    ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
    PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
    RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
    ImageHistory(id string) ([]dockertypes.ImageHistory, error)
    Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
    Version() (*dockertypes.Version, error)
    Info() (*dockertypes.Info, error)
    CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
    StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
    InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
    AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
}

而我們最終初始化返回了結構體kubeDockerClient,所以DockerInterface接口的實現,我們可以回到kubeDockerClient結構體所在文件pkg/kubelet/dockertools/kube_docker_client.go查看接口實現。

DockeClient接口分析

源碼目錄: pkg/kubelet/dockertools/kube_docker_client.go
實現的接口如下:

可以看到kubeDockerClient結構體實現了所有的DockerInterface接口。
這些接口其實是對docker的操作接口進行了封裝,下面取一個接口進行分析:

func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
    ctx, cancel := d.getTimeoutContext()
    defer cancel()
    containers, err := d.client.ContainerList(ctx, options)
    if ctxErr := contextError(ctx); ctxErr != nil {
        return nil, ctxErr
    }
    if err != nil {
        return nil, err
    }
    return containers, nil
}

該ListContainers()接口的關鍵就是調用了d.client.ContainerList(ctx, options).
所以關鍵對象還是client,繼續回到上面講初始化時介紹到的Client結構體。
Client結構所在文件: vendor/github.com/docker/engine-api/client/client.go
Client package結構:

操作docker API的接口都封裝在這些文件中,有空可以深入了解下,這里就不一一介紹了,我們繼續回到d.client.ContainerList(ctx, options),實現如下:

func (cli *Client) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
    query := url.Values{}

    if options.All {
        query.Set("all", "1")
    }

    if options.Limit != -1 {
        query.Set("limit", strconv.Itoa(options.Limit))
    }

    if options.Since != "" {
        query.Set("since", options.Since)
    }

    if options.Before != "" {
        query.Set("before", options.Before)
    }

    if options.Size {
        query.Set("size", "1")
    }

    if options.Filter.Len() > 0 {
        filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filter)

        if err != nil {
            return nil, err
        }

        query.Set("filters", filterJSON)
    }

    resp, err := cli.get(ctx, "/containers/json", query, nil)
    if err != nil {
        return nil, err
    }

    var containers []types.Container
    err = json.NewDecoder(resp.body).Decode(&containers)
    ensureReaderClosed(resp)
    return containers, err
}

前面都是一些參數初始化,其實就是構建一個GET請求,然后調用cli.get(),該get就是一個httpRequest:

func (cli *Client) get(ctx context.Context, path string, query url.Values, headers map[string][]string) (*serverResponse, error) {
    return cli.sendRequest(ctx, "GET", path, query, nil, headers)
}

func (cli *Client) sendRequest(ctx context.Context, method, path string, query url.Values, obj interface{}, headers map[string][]string) (*serverResponse, error) {
    var body io.Reader

    if obj != nil {
        var err error
        body, err = encodeData(obj)
        if err != nil {
            return nil, err
        }
        if headers == nil {
            headers = make(map[string][]string)
        }
        headers["Content-Type"] = []string{"application/json"}
    }

    return cli.sendClientRequest(ctx, method, path, query, body, headers)
}

func (cli *Client) sendClientRequest(ctx context.Context, method, path string, query url.Values, body io.Reader, headers map[string][]string) (*serverResponse, error) {
    serverResp := &serverResponse{
        body:       nil,
        statusCode: -1,
    }

...
    req, err := cli.newRequest(method, path, query, body, headers)
    if cli.proto == "unix" || cli.proto == "npipe" {
        // For local communications, it doesn"t matter what the host is. We just
        // need a valid and meaningful host name. (See #189)
        req.Host = "docker"
    }
    req.URL.Host = cli.addr
    req.URL.Scheme = cli.transport.Scheme()

    if expectedPayload && req.Header.Get("Content-Type") == "" {
        req.Header.Set("Content-Type", "text/plain")
    }

    resp, err := cancellable.Do(ctx, cli.transport, req)
    if resp != nil {
        serverResp.statusCode = resp.StatusCode
    }

...

    if serverResp.statusCode < 200 || serverResp.statusCode >= 400 {
        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            return serverResp, err
        }
        if len(body) == 0 {
            return serverResp, fmt.Errorf("Error: request returned %s for API route and version %s, check if the server supports the requested API version", http.StatusText(serverResp.statusCode), req.URL)
        }
        return serverResp, fmt.Errorf("Error response from daemon: %s", bytes.TrimSpace(body))
    }

    serverResp.body = resp.Body
    serverResp.header = resp.Header
    return serverResp, nil
}

func Do(ctx context.Context, client transport.Sender, req *http.Request) (*http.Response, error) {
...
    result := make(chan responseAndError, 1)

    go func() {
        resp, err := client.Do(req)
        testHookDoReturned()
        result <- responseAndError{resp, err}
    }()

    var resp *http.Response

    select {
    case <-ctx.Done():
        testHookContextDoneBeforeHeaders()
        cancel()
        // Clean up after the goroutine calling client.Do:
        go func() {
            if r := <-result; r.resp != nil && r.resp.Body != nil {
                testHookDidBodyClose()
                r.resp.Body.Close()
            }
        }()
        return nil, ctx.Err()
    case r := <-result:
        var err error
        resp, err = r.resp, r.err
        if err != nil {
            return resp, err
        }
    }

...

    return resp, nil
}

上面列出了httpRequest的整個調用過程,最終調用client.Do(),該client對象需要回到之前的初始化過程中去,實際就是調用vemdor/github.com/docker/engine-api/client/client.go中的Client.transport,而該對象初始化時設置為apiTransport對象:

type apiTransport struct {
    *http.Client
    *tlsInfo
    transport *http.Transport
}

所以client.Do()實際就是調用http.Client.Do()。
OK,到此算是分析結束,具體的各個接口實現,還是需要花時間查看源碼,但也都是大同小異。

學習源碼的過程中,可以看到很多經典的實現,比如上面介紹的cancellable.Do()接口實現,golang非常推崇的"協程+channel"的方式,通過select case的方式循環等待協程處理的結果,確實很方便。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/32537.html

相關文章

  • Kubelet源碼分析(一):啟動流程分析

    摘要:源碼版本簡介在急群眾,在每個節點上都會啟動一個服務進程。該進程用于處理節點下發到本節點的任務,管理及中的容器。每個進程會在上注冊節點自身信息,定期向節點匯報節點資源的使用情況,并通過監控容器和節點資源。最后運行健康檢測服務。 源碼版本 kubernetes version: v1.3.0 簡介 在Kubernetes急群眾,在每個Node節點上都會啟動一個kubelet服務進程。該進程...

    mindwind 評論0 收藏0
  • Kubelet源碼分析(四) diskSpaceManager

    摘要:顧名思義就是管理磁盤空間的,實際它的實現較為簡單,就是給所在的節點預留磁盤空間的,當該節點磁盤空間低于該值時,將拒絕的創建。其實就是中的接口該接口很簡單,就是分別調用實現的兩個接口,然后判斷磁盤空間是否夠用。 源碼版本 kubernetes version: v1.3.0 簡介 前一節介紹了Garbage Collection,涉及到的策略基本與磁盤資源有關。對于k8s集群如何高效的利...

    hlcfan 評論0 收藏0
  • Kubelet源碼分析(三):Garbage Collection

    摘要:源碼版本介紹在分析啟動流程時,老是會碰到各類,這里單獨提出來做下較詳細的分析。主要由兩部分組成使用指定的回收策略,刪除那些已經結束的所有的生命周期管理就是通過來實現的,其實該也是依賴了。相關配置該值表示磁盤占用率達到該值后會觸發。 源碼版本 kubernetes version: v1.3.0 kubelet GC介紹 在分析kubelet啟動流程時,老是會碰到各類GC,這里單獨提出來...

    siberiawolf 評論0 收藏0
  • Kubelet無法訪問rancher-metadata問題分析

    摘要:引言能夠支持,可以快速幾乎無障礙的拉起一套環境,這對剛入門的小白來說簡直是一大利器。本文就分析一下關于無法訪問問題。檢查一下有問題的節點的系統,果然會發現安裝了服務服務名為。 引言 Rancher能夠支持Kubernetes,可以快速幾乎無障礙的拉起一套K8s環境,這對剛入門K8s的小白來說簡直是一大利器。當然由于系統特性五花八門,系統內置軟件也相互影響,所以有時候伙伴們會碰到比較難纏...

    hiYoHoo 評論0 收藏0
  • Kubelet無法訪問rancher-metadata問題分析

    摘要:引言能夠支持,可以快速幾乎無障礙的拉起一套環境,這對剛入門的小白來說簡直是一大利器。本文就分析一下關于無法訪問問題。檢查一下有問題的節點的系統,果然會發現安裝了服務服務名為。 引言 Rancher能夠支持Kubernetes,可以快速幾乎無障礙的拉起一套K8s環境,這對剛入門K8s的小白來說簡直是一大利器。當然由于系統特性五花八門,系統內置軟件也相互影響,所以有時候伙伴們會碰到比較難纏...

    bigdevil_s 評論0 收藏0

發表評論

0條評論

李世贊

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<