K8s Informer原理解析,快速了解informer机制

k8s是典型的server-client架构。etcd存储集群的数据信息,apiserver作为统一的操作入口,任何对数据的操作都必须经过apiserver。
客户端通过ListAndWatch机制查询apiserver,而informer模块则封装了List-watch。

《kubernetes源码剖析》一书中的informer机制架构图:

整个架构大体分为以下几个部分:

Index

tools/cache/thread_safe_store.go中,定义了实现了线程安全的存储接口ThreadSafeStore:

type ThreadSafeStore interface {
   Add(key string, obj interface{})
   Update(key string, obj interface{})
   Delete(key string)
   Get(key string) (item interface{}, exists bool)
   List() []interface{}
   ListKeys() []string
   Replace(map[string]interface{}, string)
   Index(indexName string, obj interface{}) ([]interface{}, error)  //传入indexName和obj,返回所有和obj有相同index key的obj
   IndexKeys(indexName, indexKey string) ([]string, error)          // 传入indexName和index key,返回index key指定的所有obj key
   ListIndexFuncValues(name string) []string                        //获取indexName对应的index内的所有index key
   ByIndex(indexName, indexKey string) ([]interface{}, error)       //和IndexKeys方法类似,只是返回的是index key指定的所有obj
   GetIndexers() Indexers                                           //返回目前所有的indexers
   AddIndexers(newIndexers Indexers) error                         //存储数据前调用,添加indexer
   Resync() error    // Resync is a no-op and is deprecated
}

 

结构体threadSafeMap将资源对象数据存储于一个内存中的map数据结构中:

type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}    //实际存所有资源对象的地方
   indexers Indexers     
   indices Indices       
}

 

每次的增、删、改、查操作都会都会加锁,以保证数据的一致性。k8s.io/client-go/tools/cache/store.go中,定义了存储接口Store:
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error    
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)
    Replace([]interface{}, string) error
    Resync() error
}

 

在tools/cache/index.go中,定义了Indexer接口:

type Indexer interface {
   Store    // 继承接口Store
   Index(indexName string, obj interface{}) ([]interface{}, error)   
   IndexKeys(indexName, indexedValue string) ([]string, error)   
   ListIndexFuncValues(indexName string) []string  
   ByIndex(indexName, indexedValue string) ([]interface{}, error)   
   GetIndexers() Indexers     
   AddIndexers(newIndexers Indexers) error    
}

 

还定义了一些数据结构:

type IndexFunc func(obj interface{}) ([]string, error) // 计算资源对象的index key的函数类型,值得注意的是,返回的是多个index key组成的列表
type Indexers map[string]IndexFunc // 计算索引的方法不止一个,通过给他们命名来加以区别,存储索引名(name)与索引方法(IndexFunc)的映射
type Indices map[string]Index      // 索引名(name)与索引(index)的映射
type Index map[string]sets.String  // 索引键(index key)与值(Obj Key)列表的映射

 

它们间的关系如图所示:

 

 

具体实现在tools/cache/store.go中的cache结构体:

type cache struct {   
   cacheStorage ThreadSafeStore   //cacheStorage是一个ThreadSafeStore类型的对象,实际使用的是threadSafeMap类型
   keyFunc KeyFunc   //用于计算资源对象的index key
}
type KeyFunc func(obj interface{}) (string, error)

 

cache结构体封装了threadSafeMap的很多方法,对外提供了Add、Update、Delete等方法;Indexer接口中规定需要实现的那些方法都是调用的threadSafeMap的实现

通过cache.NewIndexer(keyFunc, Indexers)初始化Indexer对象

  1.     keyFunc:k8s内部目前使用的自定义的indexFunc有PodPVCIndexFunc 、indexByPodNodeName 、MetaNamespaceIndexFunc
  2.     默认使用MetaNamespaceKeyFunc:根据资源对象计算出<namespace>/<name>格式的key,如果资源对象的<namespace>为空,则<name>作为key
  3.     Indexers:通过NewThreadSafeStore(indexers, Indices{})得到结构体内的cacheStorage

 

示例:

// 定义一个IndexFunc,功能为:根据Annotations的users字段返回index key
func UsersIndexFunc(obj interface{}) ([]string, error){
   pod := obj.(*v1.Pod)
   usersString := pod.Annotations["users"]
   return strings.Split(usersString, ","), nil
}
 
func main() {
   index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"byUser":UsersIndexFunc})
 
   pod1 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"one",Annotations:map[string]string{"users":"ernie,bert"}}}
   pod2 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"two",Annotations:map[string]string{"users":"bert,oscar"}}}
   pod3 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"three",Annotations:map[string]string{"users":"ernie,elmo"}}}
   
   //添加3个Pod资源对象
   index.Add(pod1)
   index.Add(pod2)
   index.Add(pod3)
 
   //通过index.ByIndex函数(通过执行索引器函数得到索引结果)查询byUser索引器下匹配ernie字段的Pod列表
   erniePods, err := index.ByIndex("byUser","ernie")
   if err != nil{
      panic(err)
   }
 
   for _, erniePods := range erniePods{
      fmt.Println(erniePods.(*v1.Pod).Name)
   }
}

 

DeltaFIFO

tools/cache/delta_fifo.go中定义了DeltaFIFO。Delta代表变化, FIFO则是先入先出的队列。

DeltaFIFO将接受来的资源event,转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的本地缓存。

 

DeltaType是string的别名,代表一种变化:

type DeltaType string

 

类型定义:

const (
   Added   DeltaType = "Added"
   Updated DeltaType = "Updated"
   Deleted DeltaType = "Deleted"
   Replaced DeltaType = “Replaced”  // 替换,list出错时,会触发relist,此时会替换
   Sync DeltaType = “Sync”   // 周期性的同步,底层会当作一个update类型处理
)

 

Delta由变化类型+资源对象组成:

type Delta struct {
   Type   DeltaType
   Object interface{}
}

 

Deltas是[]delta切片:

type Deltas []Delta

 

DeltaFIFO的定义:

type DeltaFIFO struct {
   lock sync.RWMutex  //读写锁
   cond sync.Cond    //条件变量
   items map[string]Deltas   //通过map数据结构的方式存储,value存储的是对象的Deltas数组
   queue []string     //存储资源对象的key,该key通过KeyOf(obj)函数计算得到
   populated bool   //通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
   initialPopulationCount int   //通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
   keyFunc KeyFunc
   knownObjects KeyListerGetter   //indexer
   closed     bool
   closedLock sync.Mutex
   emitDeltaTypeReplaced bool
}

 

向队列里添加元素:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj  interface{}) error {
     id, err := f.KeyOf(obj)  //获取obj key
     if err != nil {
           return KeyError{obj, err}
     }
     //向items中添加delta,并对操作进行去重,目前来看,只有连续两次操作都是删除操作的情况下,才可以合并,其他操作不会合并
     newDeltas := append(f.items[id], Delta{actionType, obj})  
     newDeltas = dedupDeltas(newDeltas)
     if len(newDeltas) > 0 {
           //向queue和items中添加元素,添加以后,条件变量发出消息,通知可能正在阻塞的POP方法有事件进队列了
           if _, exists := f.items[id]; !exists {
                f.queue = append(f.queue, id)
           }
           f.items[id] = newDeltas
           f.cond.Broadcast()
     } else {
           // 冗余判断,其实是不会走到这个分支的,去重后的delta list长度怎么也不可能小于1
           delete(f.items, id)
     }
     return nil
}

 

从队列里Pop元素:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{},  error) {
     f.lock.Lock()
     defer f.lock.Unlock()
     for {
           for len(f.queue) == 0 {   // 如果队列是空的,利用条件变量阻塞住,直到有新的delta
                if f.IsClosed() {    // 如果Close()被调用,则退出
                     return nil, ErrFIFOClosed
                }
                f.cond.Wait()
           }
           id := f.queue[0]
           f.queue = f.queue[1:]
           if f.initialPopulationCount > 0 {
                f.initialPopulationCount--
           }
           item, ok := f.items[id]
           if !ok {
                // Item may have been deleted subsequently.
                continue
           }
           delete(f.items, id)
           err := process(item)
           // 如果处理失败了,调用addIfNotPresent:如果queue中没有则添加。本身刚刚从queue和items中取出对象,应该不会存在重复的对象,这里调用addIfNotPresent应该只是为了保险起见
           if e, ok := err.(ErrRequeue); ok {
                f.addIfNotPresent(id, item)
                err = e.Err
           }
           // Don't need to copyDeltas here, because we're  transferring
           // ownership to the caller.
           return item, err
     }
}

 

Reflector

tools/cache/reflector.go中定义了Reflector:

type Reflector struct {
   name string
   expectedTypeName string     //被监控的资源的类型名
   expectedType reflect.Type   // 监控的对象类型
   expectedGVK *schema.GroupVersionKind
   store Store    // 存储,就是Delta_FIFO,这里的Store类型实际是Delta_FIFO的父类
   listerWatcher ListerWatcher  // 用来进行list&watch的接口对象
   backoffManager wait.BackoffManager
   resyncPeriod time.Duration   //重新同步的周期
   ShouldResync func() bool    //周期性的判断是否需要重新同步
   clock clock.Clock     //时钟对象,主要是为了给测试留后门,方便修改时间
   ……
}

 

同一类资源Informer共享一个Reflector。Reflector通过ListAndWatch函数来ListAndWatch apiserver来获取资源的数据。

获取时需要基于ResourceVersion(Etcd生成的全局唯一且递增的资源版本号)。通过此序号,客户端可以知道目前与服务端信息同步的状态,每次只取大于等于本地序号的事件。好处是可以实现事件的全局唯一,实现”断点续传“功能,不用担心本地客户端偶尔出现的网络异常

ListAndwatch是k8s统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性、性能等,为声明式风格的API奠定了良好的基础,是k8s架构的精髓。

List在Controller重启或Watch中断的情况下,调用资源的list API罗列资源对象以进行全量更新,基于HTTP短链接实现

(1)r.listerWatcher.List用于获取资源下的所有对象的数据,例如,获取所有Pod的资源数据。获取资源数据是由options的ResourceVersion控制的。如果ResourceVersion为0,则表示获取所有Pod的资源数据;如果ResourceVersion非0,则表示根据资源版本号继续获取。

(2)listMetaInterface.GetResourceVersion用于获取资源版本号。

(3)meta.ExtractList用于将资源数据(runtime.Object对象)转换成资源对象列表([]runtime.Object对象)。

因为r.listerWatcher.List获取的是资源下的所有对象的数据,例如所有的Pod资源数据,所以它是一个资源列表。

(4)r.syncWith用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并会替换已存在的对象。

(5)r.setLastSyncResourceVersion用于设置最新的资源版本号。

Watch则在多次List之间进行,调用资源的watch API,基于当前的资源版本号监听资源变更(如Added、Updated、Deleted)事件。

通过在Http请求中带上watch=true,表示采用Http长连接持续监听apiserver发来的资源变更事件。

apiserver在response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码。每当有事件来临,返回一个WatchEvent。

 

Reflector在获取新的资源数据后,调用的Add方法将资源对象的Delta记录存放到本地缓存DeltaFIFO中。

 

Controller

在tool/cache/controller.go中定义了Controller接口:

type Controller interface {
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
}

 

controller结构体实现了此接口:

type controller struct {
   config         Config
   reflector      *Reflector
   reflectorMutex sync.RWMutex
   clock          clock.Clock
}

 

config结构体中是所有配置:

type Config struct {
   Queue     //DeltaFIFO
   ListerWatcher
   Process ProcessFunc   //从DeltaFIFO Pop调用时,调用的回调
   ObjectType runtime.Object  //期待处理的资源对象的类型
   FullResyncPeriod time.Duration   //全量resync的周期
   ShouldResync ShouldResyncFunc  //delta fifo周期性同步判断时使用
   RetryOnError bool
}

 

Controller的processLoop方法会不断地调用的Pop方法从Delta队列中消费弹出delta记录(队列中没有数据时阻塞等待数据):

func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

Pop方法须传入Process函数——用于接收并处理对象的回调方法,默认的Process函数是Informer模块中的HandleDeltas

 

informer

Kubernetes的其他组件都是通过client-go的Informer机制与Kubernetes API Server进行通信的。

Informer也被称为Shared Informer,它是可以共享使用的。

在clientgo的informer/factory.go中,有接口定义:

type SharedInformerFactory interface {
   internalinterfaces.SharedInformerFactory
   ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
   WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
   // 所有已知资源的shared informer
   Admissionregistration() admissionregistration.Interface
   Apps() apps.Interface
   Auditregistration() auditregistration.Interface
   Autoscaling() autoscaling.Interface
   Batch() batch.Interface
   Certificates() certificates.Interface
   Coordination() coordination.Interface
   Core() core.Interface
   Discovery() discovery.Interface
   Events() events.Interface
   Extensions() extensions.Interface
   Flowcontrol() flowcontrol.Interface
   Networking() networking.Interface
   Node() node.Interface
   Policy() policy.Interface
   Rbac() rbac.Interface
   Scheduling() scheduling.Interface
   Settings() settings.Interface
   Storage() storage.Interface
}

 

sharedInformerFactory结构体实现了此接口:

type sharedInformerFactory struct {
   client           kubernetes.Interface
   namespace        string
   tweakListOptions internalinterfaces.TweakListOptionsFunc
   lock             sync.Mutex
   defaultResync    time.Duration
   customResync     map[reflect.Type]time.Duration
   informers map[reflect.Type]cache.SharedIndexInformer
   startedInformers map[reflect.Type]bool   //用于追踪哪种informer被启动了,避免同一资源的Informer被实例化多次,运行过多相同的ListAndWatch
}

 

新建一个sharedInformerFactory结构体:

sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

 

第1个参数是用于与Kubernetes API Server交互的客户端;第2个参数用于设置多久进行一次resync(周期性的List操作),如果该参数为0,则禁用resync功能。

sharedInformerFactory结构体实现了所有已知资源的shared informer,例如在clientgo的informer/core/vi/pod.go中,定义了如下接口:

type PodInformer interface{
  Informer() cache.SharedIndexInformer
  Listen() v1.PodLister
}

 

podInformer结构体实现了Informer方法和Listen方法:

func (f *podInformer) Informer() cache.SharedIndexInformer {
   return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)  // 如果已经存在同类型的资源Informer,则返回当前Informer,不再继续添加
}
func (f *podInformer) Lister() v1.PodLister {
   return v1.NewPodLister(f.Informer().GetIndexer())
}

 

通过调用sharedInformers.Core().V1().Pods()获得podInformer结构体

得到具体Pod资源的informer对象:

informer := sharedInformers.Core().V1().Pods().Informer()

 

最终获得的,是clientgo/tool/cache/shared_informer.go中的sharedIndexInformer结构体,它实现的接口为:

type SharedIndexInformer interface {
   SharedInformer
   AddIndexers(indexers Indexers) error   //启动informer前为其添加indexers
   GetIndexer() Indexer
}

 

它的定义为:

type sharedIndexInformer struct {
   indexer    Indexer
   controller Controller
   processor             *sharedProcessor    
   cacheMutationDetector MutationDetector     
   listerWatcher ListerWatcher
   objectType runtime.Object        
   resyncCheckPeriod time.Duration  
   defaultEventHandlerResyncPeriod time.Duration
   clock clock.Clock
   started, stopped bool
   startedLock      sync.Mutex
   blockDeltas sync.Mutex
}

 

通过informer.AddEventHandler函数可以为资源对象添加资源事件回调方法,支持3种资源事件回调方法:AddFunc、UpdateFunc、DeleteFunc

sharedIndexInformer结构体定义了HandleDeltas函数,作为process回调函数(通过Config结构体传给controller)

当资源对象的操作类型为Added、Updated、Deleted时,会将该资源对象存储至Indexer,并通过distribute函数将资源对象分发至用户自定义的事件处理函数(通过informer.AddEventHandler添加)中

 

通过informer.Run(stopCH)运行该informer,它是一个持久化的goroutine,通过clientset对象与apiserver交互。

它会启动controller,启动时传入的Config结构体包含了

stopCH用于在程序进程退出前通知Informer退出

 

调用Pod的Informer的示例:

stopCH := make(chan struct{})
defer close(stopCH)
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)   
informer := sharedInformers.Core().V1().Pods().Informer()     
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{     //为Pod资源添加资源事件回调方法
   AddFunc: func(obj interface{}){
      mObj := obj.(v1.Object)
      log.Print("创建新Pod:",mObj.GetName())
   },
   UpdateFunc: func(oldObj, newObj interface{}){
      oObj := oldObj.(v1.Object)
      nObj := newObj.(v1.Object)
      log.Print(oObj.GetName(),",",nObj.GetName())
   },
   DeleteFunc: func(obj interface{}) {
      mObj :=obj.(v1.Object)
      log.Print("删除旧Pod:",mObj.GetName())
   },
})
informer.Run(stopCH)

 

Work Queue(选用)

在开发并行程序时,需要频繁的进行数据同步,本身golang拥有channel 机制,但不能满足一些复杂场景的需求。例如:延时队列、限速队列。

client-go中提供了多种队列以供选择,可以胜任更多的场景。工作队列会对存储的对象进行去重,从而避免多个woker 处理同一个资源的情况。

用户可以在回调函数里,将资源对象推送到WorkQueue(或其他队列)中,也可以直接处理。

参考资料:

[1] https://kubernetes.io/docs/home/

[2] https://edu.aliyun.com/roadmap/cloudnative

[3] 郑东旭《Kubernetes源码剖析》

 

代码示例:通过informer采集event并存入ES

package main

import (
   "bytes"
   "context"
   "fmt"
   "github.com/elastic/go-elasticsearch/v7"
   "github.com/elastic/go-elasticsearch/v7/esapi"
   "k8s.io/api/events/v1beta1"
   "k8s.io/apimachinery/pkg/runtime"
   "k8s.io/apimachinery/pkg/util/json"
   "k8s.io/client-go/informers"
   "k8s.io/client-go/kubernetes"
   "k8s.io/client-go/tools/cache"
   "k8s.io/client-go/tools/clientcmd"
   "math/rand"
   "strconv"
   "time"
)

func mustSuccess(err error) {
   if err != nil {
      panic(err)
   }
}

func main() {
   rand.Seed(time.Now().UnixNano())
   config, err := clientcmd.BuildConfigFromFlags("", "/Users/qiulingwei/Projects/kube-goclient/kubeconfig")
   mustSuccess(err)

   clientset, err := kubernetes.NewForConfig(config)
   mustSuccess(err)
   sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
   stopChan := make(chan struct{})
   defer close(stopChan)

   eventInformer := sharedInformers.Events().V1beta1().Events().Informer()
   addChan := make(chan v1beta1.Event)
   deleteChan := make(chan v1beta1.Event)
   eventInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
         unstructObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
         mustSuccess(err)
         event := &v1beta1.Event{}
         err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj, event)
         mustSuccess(err)
         addChan <- *event
      },
      UpdateFunc: func(oldObj, newObj interface{}) {
      },
      DeleteFunc: func(obj interface{}) {
      },
   }, 0)

   go func() {
      for  {
         select {
         case event := <- addChan:
            str, err := json.Marshal(&event)
            mustSuccess(err)
            esinsert(str)
            break
         case <-deleteChan:
            break
         }
      }
   }()
   eventInformer.Run(stopChan)
}

func esinsert(str []byte){
   cfg := elasticsearch.Config{
      Addresses: []string{
         "xxxxxx",
         "xxxxx",
         "xxxxx",
      },
      Username: "xxx",
      Password: "xxxxxx",
   }
   es, _ := elasticsearch.NewClient(cfg)
   req := esapi.CreateRequest{
      Index:        "qlw-index",
      DocumentType: "_doc",
      DocumentID:   strconv.FormatInt(time.Now().Unix(),10) + strconv.Itoa(rand.Int()),
      Body:         bytes.NewReader(str),
   }
   res, err := req.Do(context.Background(), es)
   defer res.Body.Close()
   if err!=nil  {
      fmt.Println(res.String())
   }
}

 

发表评论