ingress-nginx 工作原理(下):服务启动、运行和停止

Table of Contents

接上文。

1 服务启动

在 main 中 ngx.Start() 将启动服务。

// internal/ingress/controller/nginx.go
func (n *NGINXController) Start() {
    // (1)
    n.store.Run(n.stopCh)

    // (2)
    cmd := exec.Command(n.binary, "-c", cfgPath)
    // put nginx in another process group to prevent it
    // to receive signals meant for the controller
    // 把 nginx 扔进另外一个进程组,防止他接受控制器的信号
    cmd.SysProcAttr = &syscall.SysProcAttr{
        Setpgid: true,
        Pgid:    0,
    }
    n.start(cmd)

    // (3)
    go n.syncQueue.Run(time.Second, n.stopCh)

    // (4)
    // force initial sync
    n.syncQueue.Enqueue(&extensions.Ingress{})

    // (5)
    for {
        select {
        case err := <-n.ngxErrCh:
            if n.isShuttingDown {
                break
            }

            if process.IsRespawnIfRequired(err) {
                process.WaitUntilPortIsAvailable(n.cfg.ListenPorts.HTTP)
                // release command resources
                cmd.Process.Release()
                // start a new nginx master process if the controller is not being stopped
                cmd = exec.Command(n.binary, "-c", cfgPath)
                cmd.SysProcAttr = &syscall.SysProcAttr{
                    Setpgid: true,
                    Pgid:    0,
                }
                n.start(cmd)
            }
        case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }
            if evt, ok := event.(store.Event); ok {
                glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    n.SetForceReload(true)
                }

                n.syncQueue.Enqueue(evt.Obj)
            } else {
                glog.Warningf("unexpected event type received %T", event)
            }
        case <-n.stopCh:
            break
        }
    }
}

1、=store.Run= 会调用 informers.Run 进而触发 Endpoint、Service、ingress、等 informer 的 Run=,总之是触发 Informer 开始监听各种 Add、Delete、Update 事件,当接收到事件时,写入 =updateCh 中。

2、启动 nginx 进程(配置路径:="/etc/nginx/nginx.conf"),nginx 运行出错时会将错误写入到 =ngxErrCh 中。

3、启动 syncQueue=,=syncQueue 是个 task.Queue=,任务队列。它所做的事情是不断的从队列中取出元素,回调某个函数,然后把该元素删除。这里的回调函数是 =syncIngress 在初始化 NGINXController 时设置的。=syncIngress= 完成了将传入的 Ingress Event 处理之后生成新的 nginx 配置文件,然后 reload nginx,在文章后面会详细介绍。至于 task.Queue 我会单独写一篇文章说明内部实现。

4、向 syncQueue 压入一个空的 Ingress 元素(=syncQueue= 刚启动,这是第一个队列中的元素),将触发全量的同步 Ingress 。 注意:虽然 store.Run 之后,在这个时间段内有可能会有资源发生变更,但是变更之后的资源会记录在 updateCh 中,而不是 =syncQueue=,并不会触发 nginx reload。

5、服务运行

2 服务运行

服务正常运行是不断从 K8s 接受资源变更事件,然后转换成 nginx 新配置,进而 reload nginx 的过程。也就是上面 for ... selectcase event : <-n.updateCh.Out()= 中所做的事情:从 updateCh 中获取事件,然后添加到 syncQueue=,因为 =syncQueue 已经在运行状态了,新添加的事件对象(资源)很快会被 syncIngress 处理。让我们回顾一下整个过程(这是 ingress-nginx 的核心):

  1. syncIngress 函数为参数初始化 syncQueue && 创建一个大小为 1024 的环形 Channel updateCh
  2. 初始时向 syncQueue 添加一个空的 Ingress
  3. informer 监听 K8s 资源变化触发 AddFunc=、=DeleteFunc=、=UpdateFunc 将变更的资源添加到 updateCh
  4. for ... select 不断的从 updateCh 中取出变更资源,然后添加到 syncQueue
  5. syncQueue 不断从队列中弹出元素,然后调用 syncIngress 处理

3 服务终止

上面的 for ... select 中的 case <-n.stopCh=,等 =stopCh 有数据时,for 循环 break,服务正常结束。

main 函数中有这么一段代码:

go handleSigterm(ngx, func(code int) {
    os.Exit(code)
})

handleSigterm 的实现为:

func handleSigterm(ngx *controller.NGINXController, exit exiter) {
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGTERM)
    <-signalChan
    glog.Infof("Received SIGTERM, shutting down")

    exitCode := 0
    if err := ngx.Stop(); err != nil {
        glog.Infof("Error during shutdown %v", err)
        exitCode = 1
    }

    glog.Infof("Handled quit, awaiting pod deletion")
    time.Sleep(10 * time.Second)

    glog.Infof("Exiting with %v", exitCode)
    exit(exitCode)
}

接受任意的 SIGTERM 信号,然后调用 =ngx.Stop()=:

func (n *NGINXController) Stop() error {
    n.isShuttingDown = true

    // 关闭 stopCh 管道
    close(n.stopCh)

    // 关闭 syncQueue
    go n.syncQueue.Shutdown()

    // 退出 nginx 进程
    cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit")
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr
    err := cmd.Run()

    return nil
}

发送 SIGTERM 信号可能是终端(=kill pid=),也可能是调用 /stop

mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
    err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // <- 这里
})

服务的启动、运行、停止就介绍完了。下面介绍一下 syncQueue 的处理函数 =syncIngress=。

4 syncIngress

// ingress/controller/controller.go
func (n *NGINXController) syncIngress(interface{}) error {
    n.syncRateLimiter.Accept()

    if n.syncQueue.IsShuttingDown() {
        return nil
    }

    // (1)
    ings := n.store.ListIngresses()
    sort.SliceStable(ings, func(i, j int) bool {
        ir := ings[i].ResourceVersion
        jr := ings[j].ResourceVersion
        return ir < jr
    })

    // (2)
    upstreams, servers := n.getBackendServers(ings)
    var passUpstreams []*ingress.SSLPassthroughBackend

    for _, server := range servers {
        if !server.SSLPassthrough {
            continue
        }

        for _, loc := range server.Locations {
            if loc.Path != rootLocation {
                glog.Warningf("ignoring path %v of ssl passthrough host %v", loc.Path, server.Hostname)
                continue
            }
            passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
                Backend:  loc.Backend,
                Hostname: server.Hostname,
                Service:  loc.Service,
                Port:     loc.Port,
            })
            break
        }
    }

    // (3)
    pcfg := ingress.Configuration{
        Backends:            upstreams,
        Servers:             servers,
        TCPEndpoints:        n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
        UDPEndpoints:        n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
        PassthroughBackends: passUpstreams,
    }

    // (4)
    if !n.isForceReload() && n.runningConfig.Equal(&pcfg) {
        glog.V(3).Infof("skipping backend reload (no changes detected)")
        return nil
    }

    // (5)
    err := n.OnUpdate(pcfg)

    // (6)
    n.runningConfig = &pcfg

    return nil
}

1、=syncIngress= 的参数虽然为 interface{} ,但是并没有使用参数,数据是从 store.ListIngress() 中获取的。在 Informers 的事件回调处理函数 AddFunc 中有下面三行代码:

store.extractAnnotations(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)

估计是在 store 单独存储了一份 K8s ingress cache,在资源变更时同步更新缓存。所以在 syncIngress 每次都是拿到全部的 Ingresses。也就是说 syncQueue 中添加元素只是为了触发 syncQueue=,而 =Enqueue 的传入的数据是什么是无意义的。

2、根据 ingress 列表生成 backend 列表和 server 列表。backend 对应的是 NGINX 的 upstreams,而 server 对应的也是 NGINX 的 server。

3、=ingress.Configuration= 对应最终的 nginx 配置文件所需的数据。

4、不是强制重启条件并且 nginx 当前的配置和新配置的相同时,跳过本次同步。

5、=onUpdate= 更新配置文件,更新流程为:

  1. 将 configmap 形式的 configuration 转换成普通的 configuration 对象(更贴合 nginx 配置形式的)
  2. 将配置写入自定义的配置模板(内存中)
  3. 先写入一个临时文件,测试配置是否存在问题:=nginx -t -c tmpfile=
  4. 写入配置文件 "/etc/nginx/nginx.conf"
  5. reload nginx:=nginx -s reload -c /etc/nginx/nginx.conf=

6、将新的配置文件作为替换正在运行的配置文件(以便下次更新对比)。


这两篇文章简单讲解了 ingress-nginx 的生命周期,不够深入也不够全面。ingress-nginx 细节很多,有很多代码都是与 SSL 相关的,没费心思看。当初看 ingress-nginx 的初衷是研究如何监听 K8s 资源变更事件的,看了源代码之后才知道它使用了 K8s client 的 informers 组件来实现的,而 store 模块 cache 也是封装了 =k8s.io/client-go/tools/cache=,它自己只是实现了自己的逻辑(ingress->nginx config),感兴趣大家可以去看看相关资料。

Date: 2018-06-13 10:52:40

Author: JerryZhang