16.深入k8s:Informer使用及其源码分析 (5)

这里会循环调用clientset客户端api与apiServer建立长连接,监控指定资源的变更,如果监控到有资源变更,那么会调用watchHandler处理资源的变更事件。

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { ... loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): ... // 获取资源版本号 newResourceVersion := meta.GetResourceVersion() switch event.Type { //将添加资源事件添加到DeltaFIFO队列中 case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } //将更新资源事件添加到DeltaFIFO队列中 case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } //将删除资源事件添加到DeltaFIFO队列中 case watch.Deleted: err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } ... *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } ... }

watchHandler方法会根据传入的资源类型调用不同的方法转换成不同的Delta然后存入到DeltaFIFO队列中。

processLoop分发DeltaFIFO队列中任务

processLoop方法,以1s为周期,周期性的执行。

文件位置:tools/cache/controller.go

func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }

这里会循环将DeltaFIFO队列中数据pop出队,然后交给Process方法进行处理,Process方法是在上面调用sharedIndexInformer的Run方法的数据设置,设置的方法是sharedIndexInformer的HandleDeltas方法。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest //根据obj的Type类型进行分发 for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) //如果缓存中存在该对象 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { //更新indexr if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: //新老对象获取版本号进行比较 if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) // 如果缓存中不存在该对象 } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }

HandleDeltas会与indexer缓存交互更新我们从Delta FIFO中取到的内容,之后通过s.processor.distribute()进行消息的分发。

在distribute中,sharedProcesser通过listener.add(obj)向每个listener分发该object。而该函数中又执行了p.addCh <- notification。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } }

这里可以结合上面的p.wg.Start(listener.run)和p.wg.Start(listener.pop)方法来进行理解,这里将notification传入到addCh管道之后会触发EventHandler事件。

这里我用一张图总结一下informer的Run方法流程:

image-20201017233145402

至此,我们分析完了informer的所有机制。

总结

通过上面分析,我们全面熟悉了k8s是如何通过Informer机制实现ListAndWatch获取并监视 API 对象变化。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwwwf.html