ingress-nginx 工作原理(下):服务启动、运行和停止

Table of Contents

接上文。

1. 服务启动

mainngx.Start() 将启动服务。

// internal/ingress/controller/nginx.go
func (n *NGINXController) Start() {
    // (1)
    n.store.Run(n.stopCh)

    // (2)
    cmd := exec.Command(n.binary, "-c", cfgPath)
    // put nginx in another process group to prevent it
    // to receive signals meant for the controller
    // 把 nginx 扔进另外一个进程组,防止他接受控制器的信号
    cmd.SysProcAttr = &syscall.SysProcAttr{
        Setpgid: true,
        Pgid:    0,
    }
    n.start(cmd)

    // (3)
    go n.syncQueue.Run(time.Second, n.stopCh)

    // (4)
    // force initial sync
    n.syncQueue.Enqueue(&extensions.Ingress{})

    // (5)
    for {
        select {
        case err := <-n.ngxErrCh:
            if n.isShuttingDown {
                break
            }

            if process.IsRespawnIfRequired(err) {
                process.WaitUntilPortIsAvailable(n.cfg.ListenPorts.HTTP)
                // release command resources
                cmd.Process.Release()
                // start a new nginx master process if the controller is not being stopped
                cmd = exec.Command(n.binary, "-c", cfgPath)
                cmd.SysProcAttr = &syscall.SysProcAttr{
                    Setpgid: true,
                    Pgid:    0,
                }
                n.start(cmd)
            }
        case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }
            if evt, ok := event.(store.Event); ok {
                glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    n.SetForceReload(true)
                }

                n.syncQueue.Enqueue(evt.Obj)
            } else {
                glog.Warningf("unexpected event type received %T", event)
            }
        case <-n.stopCh:
            break
        }
    }
}
  1. store.Run 会调用 informers.Run 进而触发 Endpoint、Service、ingress等 informer 的 Run , 总之是触发 Informer 开始监听各种 Add、Delete、Update 事件,当接收到事件时,写入 updateCh
  2. 启动 nginx 进程(配置路径: "/etc/nginx/nginx.conf" ),nginx 运行出错时会将错误写入到 ngxErrCh
  3. 启动 syncQueue=,=syncQueue 是个 task.Queue ,任务队列。它所做的事情是不断的从队列中取出元素,回调某个函数,然后把该元素删除。 这里的回调函数是 syncIngress 在初始化 NGINXController 时设置的。 syncIngress 完成了将传入的 Ingress Event 处理之后生成新的 nginx 配置文件,然后 reload nginx,在文章后面会详细介绍。至于 task.Queue 我会单独写一篇文章说明内部实现
  4. syncQueue 压入一个空的 Ingress 元素( syncQueue 刚启动,这是第一个队列中的元素),将触发全量的同步 Ingress 。 注意:虽然 store.Run 之后,在这个时间段内有可能会有资源发生变更,但是变更之后的资源会记录在 updateCh 中,而不是 syncQueue , 并不会触发 nginx reload
  5. 服务运行

2. 服务运行

服务正常运行是不断从 K8s 接受资源变更事件,然后转换成 nginx 新配置,进而 reload nginx 的过程。

也就是上面 for ... selectcase event := <-n.updateCh.Out() 中所做的事情:从 updateCh 中获取事件,然后添加到 syncQueue , 因为 syncQueue 已经在运行状态了,新添加的事件对象(资源)很快会被 syncIngress 处理。

让我们回顾一下整个过程(这是 ingress-nginx 的核心):

  1. syncIngress 函数为参数初始化 syncQueue && 创建一个大小为 1024 的环形 Channel updateCh
  2. 初始时向 syncQueue 添加一个空的 Ingress
  3. informer 监听 K8s 资源变化触发 AddFunc DeleteFunc UpdateFunc 将变更的资源添加到 updateCh
  4. for ... select 不断的从 updateCh 中取出变更资源,然后添加到 syncQueue
  5. syncQueue 不断从队列中弹出元素,然后调用 syncIngress 处理

3. 服务终止

上面的 for ... select 中的 case <-n.stopCh ,等 stopCh 有数据时,for 循环 break,服务正常结束。

main 函数中有这么一段代码:

go handleSigterm(ngx, func(code int) {
    os.Exit(code)
})

handleSigterm 的实现为:

func handleSigterm(ngx *controller.NGINXController, exit exiter) {
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGTERM)
    <-signalChan
    glog.Infof("Received SIGTERM, shutting down")

    exitCode := 0
    if err := ngx.Stop(); err != nil {
        glog.Infof("Error during shutdown %v", err)
        exitCode = 1
    }

    glog.Infof("Handled quit, awaiting pod deletion")
    time.Sleep(10 * time.Second)

    glog.Infof("Exiting with %v", exitCode)
    exit(exitCode)
}

接受任意的 SIGTERM 信号,然后调用 ngx.Stop()

func (n *NGINXController) Stop() error {
    n.isShuttingDown = true

    // 关闭 stopCh 管道
    close(n.stopCh)

    // 关闭 syncQueue
    go n.syncQueue.Shutdown()

    // 退出 nginx 进程
    cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit")
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr
    err := cmd.Run()

    return nil
}

发送 SIGTERM 信号可能是终端(kill pid),也可能是调用 /stop

mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
    err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // <- 这里
})

服务的启动、运行、停止就介绍完了。下面介绍一下 syncQueue 的处理函数 syncIngress

4. syncIngress

// ingress/controller/controller.go
func (n *NGINXController) syncIngress(interface{}) error {
    n.syncRateLimiter.Accept()

    if n.syncQueue.IsShuttingDown() {
        return nil
    }

    // (1)
    ings := n.store.ListIngresses()
    sort.SliceStable(ings, func(i, j int) bool {
        ir := ings[i].ResourceVersion
        jr := ings[j].ResourceVersion
        return ir < jr
    })

    // (2)
    upstreams, servers := n.getBackendServers(ings)
    var passUpstreams []*ingress.SSLPassthroughBackend

    for _, server := range servers {
        if !server.SSLPassthrough {
            continue
        }

        for _, loc := range server.Locations {
            if loc.Path != rootLocation {
                glog.Warningf("ignoring path %v of ssl passthrough host %v", loc.Path, server.Hostname)
                continue
            }
            passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
                Backend:  loc.Backend,
                Hostname: server.Hostname,
                Service:  loc.Service,
                Port:     loc.Port,
            })
            break
        }
    }

    // (3)
    pcfg := ingress.Configuration{
        Backends:            upstreams,
        Servers:             servers,
        TCPEndpoints:        n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
        UDPEndpoints:        n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
        PassthroughBackends: passUpstreams,
    }

    // (4)
    if !n.isForceReload() && n.runningConfig.Equal(&pcfg) {
        glog.V(3).Infof("skipping backend reload (no changes detected)")
        return nil
    }

    // (5)
    err := n.OnUpdate(pcfg)

    // (6)
    n.runningConfig = &pcfg

    return nil
}
  1. syncIngress 的参数虽然为 interface{} ,但是并没有使用参数,数据是从 store.ListIngress() 中获取的。 在 Informers 的事件回调处理函数 AddFunc 中有下面三行代码:

    store.extractAnnotations(ing)
    store.updateSecretIngressMap(ing)
    store.syncSecrets(ing)
    

    估计是在 store 单独存储了一份 K8s ingress cache,在资源变更时同步更新缓存。所以在 syncIngress 每次都是拿到全部的 Ingresses。 也就是说 syncQueue 中添加元素只是为了触发 syncQueue ,而 Enqueue 的传入的数据是什么是无意义的。

  2. 根据 ingress 列表生成 backend 列表和 server 列表。backend 对应的是 NGINX 的 upstreams,而 server 对应的也是 NGINX 的 server
  3. ingress.Configuration 对应最终的 nginx 配置文件所需的数据
  4. 不是强制重启条件并且 nginx 当前的配置和新配置的相同时,跳过本次同步
  5. onUpdate 更新配置文件,更新流程为:
    1. 将 configmap 形式的 configuration 转换成普通的 configuration 对象(更贴合 nginx 配置形式的)
    2. 将配置写入自定义的配置模板(内存中)
    3. 先写入一个临时文件,测试配置是否存在问题: nginx -t -c tmpfile
    4. 写入配置文件 "/etc/nginx/nginx.conf"
    5. reload nginx: nginx -s reload -c /etc/nginx/nginx.conf
  6. 将新的配置文件作为替换正在运行的配置文件(以便下次更新对比)

这两篇文章简单讲解了 ingress-nginx 的生命周期,不够深入也不够全面。ingress-nginx 细节很多,有很多代码都是与 SSL 相关的,没费心思看。

当初看 ingress-nginx 的初衷是研究如何监听 K8s 资源变更事件的,看了源代码之后才知道它使用了 K8s client 的 informers 组件来实现的, 而 store 模块 cache 也是封装了 k8s.io/client-go/tools/cache ,它自己只是实现了自己的逻辑(ingress->nginx config),感兴趣大家可以去看看相关资料。

First created: 2018-06-13 10:52:40
Last updated: 2022-12-11 Sun 12:49
Power by Emacs 27.1 (Org mode 9.4.4)