ingress-nginx 工作原理(上):服务初始化

Table of Contents

ingress-nginx 是 K8s ingress controller 的一种实现。这段时间在看它的源代码,源代码虽然不算很长(核心代码一万五千行左右),但是有很多的 K8s 依赖(除了依赖 K8s API server 和 client-go 以外,还依赖了 apimachinery 中的组件),如果对 K8s 的组件不是很熟的话,读起来还是有很多的困扰。

本文只介绍 ingress-nginx 工作原理,依赖组件(queue、watch 等)会在后续的文章介绍。源码主要由两个模块组成:cmd 和 internal,cmd 是入口(main),internal 按依赖组件分别进行了二次封装(file、ingress、k8s 等)。

工作原理如下图(图片看不太清,可点击下载到本地查看):

[[ Sorry, your browser does not support SVG.][ Sorry, your browser does not support SVG.]]

按照图片从上到下看已经把 ingress-nginx 大体流程显示清楚了,下面将站在源码的角度从服务的生命周期(服务初始化、服务启动、服务运行、服务停止)简单说一下实现上的一些细节。篇幅较长,分为上、下两篇文章。

另外,文中引用的源代码是精简过后的,删掉线路无关的代码。代码和文字说明部分用序号关联:=(1)= 对应 =1、=。


1 服务初始化

服务初始化,可以拆分成 K8s 客户端创建、NGINXController、对外 HTTP 服务、启动服务几块。

// cmd/nginx/main.go
func main() {
    // (1)
    showVersion, conf, err := parseFlags()
    if showVersion {
        os.Exit(0)
    }

    // (2)
    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.KubeConfigFile)
    if err != nil {
        handleFatalInitError(err)
    }

    // (3)
    if conf.Namespace != "" {
        _, err = kubeClient.CoreV1().Namespaces().Get(conf.Namespace, metav1.GetOptions{})
        if err != nil {
            glog.Fatalf("no namespace with name %v found: %v", conf.Namespace, err)
        }
    }

    conf.Client = kubeClient

    // (4)
    ngx := controller.NewNGINXController(conf, fs)

    // (7)
    go handleSigterm(ngx, func(code int) {
        os.Exit(code)
    })

    // (5)
    mux := http.NewServeMux()
    go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)

    // (6)
    ngx.Start()
}

1、解析命令行参数,得到 controller.Configuration 对象,Configuration 涵盖了所有 Ingress controller 所有需要的配置,比如 K8s APIServer 相关配置参数。

2、创建 ApiserverClient,根据 Configuration 的配置创建 K8s Client,创建之后调用 ServerVersion() API 以判断 API Server 是否可以正常连接。因为在一些环境中第一次可能连不上 API Server,所以这里有重试策略。

// cmd/nginx/main.go
func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) {
    var v *discovery.Info

    // https://github.com/kubernetes/ingress-nginx/issues/1968
    // 在一些环境中可能第一次连不上 API server
    defaultRetry := wait.Backoff{
        Steps:    10,              // 尝试次数
        Duration: 1 * time.Second, // 基准的尝试时间
        Factor:   1.5,             // 持续时间 = 迭代次数 * 因子
        Jitter:   0.1,             // 每次迭代应用的抖动量
    }

    var lastErr error
    retries := 0
    glog.V(2).Info("trying to discover Kubernetes version")
    err = wait.ExponentialBackoff(defaultRetry, func() (bool, error) {
        v, err = client.Discovery().ServerVersion() // 查看版本号

        if err == nil {
            return true, nil
        }

        lastErr = err

        retries++
        return false, nil
    })
}

waitk8s.io/apimachinery 的组件,=ExponentialBackoff= 会根据 Backoff 提供的参数(将决定回调函数执行之后的 Sleep 间隔)去执行回调函数,是否继续执行由回调函数的返回值决定。代码不长,感兴趣可以自行查看 apimachinery/pkg/util/wait/wait.go#L175 源代码。

3、ingress-nginx 监听的 namespace 由参数中的 watch-namespace 来决定,如果为空则表示默认监听所有的 namespace。

4、创建 NGINXController,根据名字就知道了,NGINXController 是 ingress-controller 的 controller。

// internal/ingress/controller/nginx.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
    // 环境变量查看 NGINX_BINARY,如果未设置则使用 nginxBinary = "/usr/sbin/nginx"
    ngx := os.Getenv("NGINX_BINARY")
    if ngx == "" {
        ngx = nginxBinary
    }

    n := &NGINXController{
        binary: ngx,
        resolver:        h,
        cfg:             config,
        // (4.1)
        stopCh:   make(chan struct{}),
        // (4.2)
        updateCh: channels.NewRingChannel(1024),
        // (4.3)
        fileSystem: fs,
        // (4.4)
        runningConfig: &ingress.Configuration{},
    }

    // (4.5)
    n.store = store.New(
        fs,
        n.updateCh)

    // (4.6)
    n.syncQueue = task.NewTaskQueue(n.syncIngress)

    // (4.7)
    onTemplateChange := func() {
        template, err := ngx_template.NewTemplate(tmplPath, fs)
        if err != nil {
            // this error is different from the rest because it must be clear why nginx is not working
            glog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template : %v
-------------------------------------------------------------------------------
`, err)
            return
        }

        n.t = template
        glog.Info("new NGINX template loaded")
        n.SetForceReload(true)
    }

    ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
    if err != nil {
        glog.Fatalf("invalid NGINX template: %v", err)
    }

    n.t = ngxTpl

    _, err = watch.NewFileWatcher(tmplPath, onTemplateChange)
    if err != nil {
        glog.Fatalf("unexpected error creating file watcher: %v", err)
    }

    filesToWatch := []string{}
    err := filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }

        if info.IsDir() {
            return nil
        }

        filesToWatch = append(filesToWatch, path)
        return nil
    })

    if err != nil {
        glog.Fatalf("unexpected error creating file watcher: %v", err)
    }

    for _, f := range filesToWatch {
        _, err = watch.NewFileWatcher(f, func() {
            glog.Info("file %v changed. Reloading NGINX", f)
            n.SetForceReload(true)
        })
        if err != nil {
            glog.Fatalf("unexpected error creating file watcher: %v", err)
        }
    }
}

4.1、=stopCh= 服务是否需要终止管道,终止服务时只需要向管道中写入数据即可。

4.2、=updateCh= 是一个 RingChannel -> 环形管道,buffer 长度为 1024,环形的意思是不会阻塞管道写入,当空间用完时,新的元素将覆盖最早的元素_,使用了 eapache/channels 的实现方案,它对标准的 Channel 进行了扩展,常规的数据结构,后面我会专门写篇文章分析其数据结构。

4.3、=fileSystem= 封装了 K8s 的 filesystem(它也封装了标准库的一些文件操作),ingress-nginx 需要对本机的目录、文件进行操作,比如写 nginx 配置文件更新之后写入。比较简单,不展开讲了。

4.4、 runningConfig 保存了一份当前 nginx 运行配置对应的 =ingress.Configuration=,其主要作用在于当 ingress 需要更新时,判断新的配置与旧的配置是否相同,如果完全相同的话,就不需要 reload nginx 了。

4.5、store 对应的是 K8sStore 的实现,封装了 K8s 的 Informer=,注意这里传入的 =updateCh 和 NGINXController 中的 updateCh 是同一个(指针)。创建 Store 做了很多事情,需要注意(代码比较长,我进行了精简,只看思路):

// ingress/controller/store/store.go
// 参数省略了
func New() Storer {
    store := &k8sStore{
        informers: &Informer{},
        listers:   &Lister{},
        updateCh:  updateCh,
    }

    // (4.5.1)
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
        Interface: client.CoreV1().Events(namespace),
    })
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
        Component: "nginx-ingress-controller",
    })

    // (4.5.2)
    // create informers factory, enable and assign required informers
    infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
    store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
    store.listers.Ingress.Store = store.informers.Ingress.GetStore()

    // Endpoint, Secret, ConfigMap, Service 和 Ingress 差不多,代码省略了

    ingEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ing := obj.(*extensions.Ingress)
            if !class.IsValid(ing) {
                a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

            // (4.5.3)
            updateCh.In() <- Event{
                Type: CreateEvent,
                Obj:  obj,
            }
        },
        DeleteFunc: func(obj interface{}) {
            ing, ok := obj.(*extensions.Ingress)
            recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

            // (4.5.3)
            updateCh.In() <- Event{
                Type: DeleteEvent,
                Obj:  obj,
            }
        },
        UpdateFunc: func(old, cur interface{}) {
            oldIng := old.(*extensions.Ingress)
            curIng := cur.(*extensions.Ingress)
            validOld := class.IsValid(oldIng)
            validCur := class.IsValid(curIng)
            if !validOld && validCur {
                recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validOld && !validCur {
                recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validCur && !reflect.DeepEqual(old, cur) {
                recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            }

            // (4.5.3)
            updateCh.In() <- Event{
                Type: UpdateEvent,
                Obj:  cur,
            }
        },
    }
    store.informers.Ingress.AddEventHandler(ingEventHandler)
    return store
}

4.5.1、使用了 K8s 的 record 和 watch 组件,简单来讲就是创建一个广播器,一方输入数据(recorder),输入的是事件(对象,类型,其它附加信息)-> =recorder.Eventf=,另外一方是 watcher,watcher 可以是 Logger、Sink 等,广播器负责将输入的数据广播给 watcher,后面我会写篇文章分析。

4.5.2、创建了 K8s 的 informers,informers 是 Client-go 的非常关键的的组件,内部实现极其复杂。当需要 List/Get K8s 中的 Objects 时,可以直接用 Informer 实例的 Lister() 方法。它最重要的功能是*监听事件并触发回调函数*。它可以监听 resource 的创建、更新和删除三种事件类型,回调函数只需要实现 AddFunc=、=DeleteFunc=、=UpdateFunc 三个函数即可。上面的 ingEventHandler 对应的是 ingress 的 EventHandler。这篇文章 对 Informer 实现进行了一定的说明,内部实现细节有时间我研究研究。

4.5.3、当收到增删改的事件时,解析出事件对应的类型和 Objects,然后添加到 updateCh 中。

5、创建 HTTP Server,添加 URL 及其 handler。相比其他地方,这里的代码很简单,看上面的大图即可。这里的 registerHandlers 居然是在协程里做的,如果是纯粹的 HTTP Server 的话,还会这么做吗?

6、启动服务,下篇文章详解。

7、处理 term 信号,下篇文章详解。

Date: 2018-06-13 10:52:40

Author: JerryZhang