16.深入k8s:Informer使用及其源码分析 (4)

run方法会调用processorListener的run方法和pop方法,这两个方法合在一起完成了事件回调。

func (p *processorListener) add(notification interface{}) { p.addCh <- notification } func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { notification = notificationToAdd nextCh = p.nextCh } else { p.pendingNotifications.WriteOne(notificationToAdd) } } } }

这段代码,我把add方法也贴到这里了,是因为监听的事件都是从这个方法传入的,然后写入到addCh管道中。

pop方法在select代码块中会获取addCh管道中的数据,第一个循环的时候notification是nil,所以会将nextCh设置为p.nextCh;第二个循环的时候会将数据写入到nextCh中。

当notification不为空的时候是直接将数据存入pendingNotifications缓存中的,取也是从pendingNotifications中读取。

下面我们看看run方法:

func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) }

run每秒遍历一次nextCh中的数据,然后根据不同的notification类型执行不同的回调方法,这里会回调到我们在main方法中注册的eventHandler。

下面我们再回到sharedIndexInformer的Run方法中往下走,会运行controller的Run方法。

文件位置:tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) { ... //创建Reflector r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) ... //启动Reflector wg.StartWithChannel(stopCh, r.Run) //每秒中循环调用DeltaFIFO队列的pop方法, wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() }

这里对应Informer运行原理里面Informer上部分创建Reflector并进行监听,和下部分循环调用DeltaFIFO队列的pop方法进行分发。

启动Reflector进行监听

Reflector的Run方法最后会调用到Reflector的ListAndWatch方法进行监听获取资源。ListAndWatch代码会分为两部分,一部分是List,一部分是Watch。

我们先看List部分代码:

代码位置:tools/cache/reflector.go

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... if err := func() error { ... go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { //根据参数获取pod 列表 return r.listerWatcher.List(opts) })) ... list, paginatedResult, err = pager.List(context.Background(), options) ... close(listCh) }() ... //获取资源版本号 resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") //将资源数据转换成资源对象列表 items, err := meta.ExtractList(list) ... //将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } ... r.setLastSyncResourceVersion(resourceVersion) return nil }(); err != nil { return err } ... }

这部分的代码会分为如下几个部分:

调用listerWatcher.List方法,获取资源下的所有对象的数据,这个方法会通过api调用到apiServer获取资源列表,代码我在上面已经贴出来了;

调用listMetaInterface.GetResourceVersion获取资源版本号;

调用meta.ExtractList方法将资源数据转换成资源对象列表;

将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中;

最后调用setLastSyncResourceVersion方法更新资源版本号;

下面看看Watch部分的代码:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... for { ... //调用clientset客户端api与apiServer建立长连接,监控指定资源的变更 w, err := r.listerWatcher.Watch(options) ... //处理资源的变更事件 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { ... return nil } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwwwf.html