16.深入k8s:Informer使用及其源码分析 (3)

sharedIndexInformer里面会创建sharedProcessor,设置List&Watch的回调函数,创建了一个indexer,我们这里看一下NewIndexer是怎么创建indexer的:

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }

NewIndexer方法创建了一个cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一个object,生成它的namepace/name的字符串。cache里面的数据会存放到cacheStorage中,它是一个threadSafeMap用来存储资源对象并自带索引功能的本地存储。

注册EventHandler事件

EventHandler事件的注册是通过informer的AddEventHandler方法进行的。在调用AddEventHandler方法的时候,传入一个cache.ResourceEventHandlerFuncs结构体:

文件位置:tools/cache/shared_informer.go

func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() ... //初始化监听器 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) //如果informer还没启动,那么直接将监听器加入到processor监听器列表中 if !s.started { s.processor.addListener(listener) return } //如果informer已经启动,那么需要加锁 s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) //然后将indexer中缓存的数据写入到listener中 for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } }

AddEventHandler方法会调用到AddEventHandlerWithResyncPeriod方法中,然后调用newProcessListener初始化listener。

接着会校验informer是否已经启动,如果没有启动,那么直接将监听器加入到processor监听器列表中并返回;如果informer已经启动,那么需要加锁将监听器加入到processor监听器列表中,然后将indexer中缓存的数据写入到listener中。

需要注意的是listener.add方法会调用processorListener的add方法,这个方法会将数据写入到addCh管道中:

func (p *processorListener) add(notification interface{}) { p.addCh <- notification }

addCh管道里面数据是用来处理事件回调的,后面我会说到。

大致的流程如下:

image-20201017213620364

启动Informer模块

最后我们在上面的demo中会使用sharedIndexInformer的Run方法来启动Informer模块。

文件位置:tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() //初始化DeltaFIFO队列 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) cfg := &Config{ //设置Queue为DeltaFIFO队列 Queue: fifo, //设置List&Watch的回调函数 ListerWatcher: s.listerWatcher, ObjectType: s.objectType, //设置Resync周期 FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, //判断有哪些监听器到期需要被Resync ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() //异步创建controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) //调用run方法启动processor wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true }() //启动controller s.controller.Run(stopCh) }

这段代码主要做了以下几件事:

调用NewDeltaFIFOWithOptions方法初始化DeltaFIFO队列;

初始化Config结果体,作为创建controller的参数;

异步创建controller;

调用run方法启动processor;

调用run方法启动controller;

下面我们看看sharedProcessor的run方法做了什么:

func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { ... //遍历监听器 for _, listener := range p.listeners { //下面两个方法是核心的事件call back的方法 p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() ... }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwwwf.html