16.深入k8s:Informer使用及其源码分析 (2)

文件位置:informers/factory.go

func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) } func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }

NewSharedInformerFactory方法最终会调用到NewSharedInformerFactoryWithOptions初始化一个sharedInformerFactory,在初始化的时候会初始化一个informers,用来缓存不同类型的informer。

informer 初始化

informer初始化会调用sharedInformerFactory的方法进行初始化,并且可以调用不同资源的Informer。

podInformer := sharedInformers.Core().V1().Pods().Informer() nodeInformer := sharedInformers.Node().V1beta1().RuntimeClasses().Informer()

定义不同资源的Informer可以用来监控node或pod。

通过调用Informer方法会根据类型来创建Informer,同一类资源会共享同一个informer。

文件路径:informers/factory.go

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { //创建informer return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { //传入上面定义的defaultInformer方法,用于创建informer return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() //获取informer类型 informerType := reflect.TypeOf(obj) //查找map缓存,如果存在,那么直接返回 informer, exists := f.informers[informerType] if exists { return informer } //根据类型查找resync的周期 resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } //调用defaultInformer方法创建informer informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }

调用InformerFor方法的时候会传入defaultInformer方法用于创建informer。

InformerFor方法里面首先会去sharedInformerFactory的map缓存中根据类型查找对应的informer,如果存在那么直接返回,如果不存在,那么则会调用newFunc方法创建informer,然后设置到informers缓存中。

下面我们看一下NewFilteredPodInformer是如何创建Informer的:

文件位置:informers/core/v1/pod.go

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } //调用apiserver获取pod列表 return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } //调用apiserver监控pod列表 return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }

这里是真正的创建一个informer,并注册了List&Watch的回调函数,list回调函数的api类似下面这样:

result = &v1.PodList{} err = c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result)

构造Informer通过NewSharedIndexInformer完成:

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwwwf.html