k8s排水命令源码阅读
kubectl
K8s 使用 cobra 作为命令行构建器(我觉得 cobra 不是很有用,文档也不清楚。)实际的处理逻辑pkg/kubectl/cmd/cmd.go在cmd/kubectl/kubectl.go
...
groups := templates.CommandGroups{
{
Message: "Basic Commands (Beginner):",
...
},
{
Message: "Deploy Commands:",
...
},
{
Message: "Cluster Management Commands:",
Commands: []*cobra.Command{
certificates.NewCmdCertificate(f, ioStreams),
clusterinfo.NewCmdClusterInfo(f, ioStreams),
top.NewCmdTop(f, ioStreams),
drain.NewCmdCordon(f, ioStreams),
drain.NewCmdUncordon(f, ioStreams),
drain.NewCmdDrain(f, ioStreams),
taint.NewCmdTaint(f, ioStreams),
},
},
...
}
groups.Add(cmds)
正如您在kubectl所有子命令的入口点中看到的那样,drain我们今天要查看的命令都是集群管理命令并且包含。
- cordon
- uncordon
- drain
cordon
让我们从命令开始,该cordon命令的目的是将一个节点标记为不可调度,以防止 K8s 在进行节点维护时将资源调度到该节点。
func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
o := NewDrainCmdOptions(f, ioStreams)
cmd := &cobra.Command{
Use: "cordon NODE",
DisableFlagsInUseLine: true,
Short: i18n.T("Mark node as unschedulable"),
Long: cordonLong,
Example: cordonExample,
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.RunCordonOrUncordon(true))
},
}
cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")
cmdutil.AddDryRunFlag(cmd)
return cmd
}
直接Run看. cmdutil.CheckErr_ o.Complete_ o.RunCordonOrUncordon的根本目的kubectl是向 APIServer 发送相应的 HTTP 请求,并通过 和实现kubectl一层封装,使各个子命令的实现统一简洁。一致和简洁。BuilderVisitor
// Builder provides convenience functions for taking arguments and parameters
// from the command line and converting them to a list of resources to iterate
// over using the Visitor interface.
type Builder struct {
...
}
// Visitor lets clients walk a list of resources.
type Visitor interface {
Visit(VisitorFunc) error
}
对应builder的构造在o.Complete:
...
// 根据命令行参数构建 builder 实例
builder := f.NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(o.Namespace).DefaultNamespace().
ResourceNames("nodes", args...).
SingleResourceType().
Flatten()
if len(o.drainer.Selector) > 0 {
builder = builder.LabelSelectorParam(o.drainer.Selector).
ResourceTypes("nodes")
}
// builder.Do 返回带有 Visitor 的 Result 对象
r := builder.Do()
查看builder.Do()下一步返回 Result 类型资源的操作。
func (b *Builder) Do() *Result {
// 调用 visitorResult 返回 Result 类型
r := b.visitorResult()
...
return r
}
...
func (b *Builder) visitorResult() *Result {
...
// 跳过其他步骤,直接看最简单的通过 Name 来获取 Result
if len(b.names) != 0 {
return b.visitByName()
}
...
}
...
func (b *Builder) visitByName() *Result {
// 声明 Result 对象
result := &Result{
singleItemImplied: len(b.names) == 1,
targetsSingleItems: true,
}
...
// 获取 K8s client
client, err := b.getClient(mapping.GroupVersionKind.GroupVersion())
...
visitors := []Visitor{}
for _, name := range b.names {
info := &Info{
Client: client,
Mapping: mapping,
Namespace: selectorNamespace,
Name: name,
Export: b.export,
}
visitors = append(visitors, info)
}
// VisitorList 也实现了 Visit 接口,遍历执行 Visitor 的 Visit 方法
result.visitor = VisitorList(visitors)
result.sources = visitors
return result
}
看完了如何获取Result类型的对象,我们来看看如何o.Complete处理,传入一个VisitorFunc,Result的访问者都实现了Visit接口,接口的作用Visit就是接收VisitorFunc并执行。接口的作用Visit是接收VisitorFunc并执行它。
return r.Visit(func(info *resource.Info, err error) error {
...
})
...
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
...
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
return fn(info, nil)
})
}
接下来,看看有什么o.RunCordonOrUncordon作用。
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
cordonOrUncordon := "cordon"
if !desired {
cordonOrUncordon = "un" + cordonOrUncordon
}
// 通过 Visit 获取到的 nodeInfos 列表
for _, nodeInfo := range o.nodeInfos {
...
gvk := nodeInfo.ResourceMapping().GroupVersionKind
if gvk.Kind == "Node" {
c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk)
if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
...
} else {
if o.drainer.DryRunStrategy != cmdutil.DryRunClient {
...
// 修改对应节点的配置
err, patchErr := c.PatchOrReplace(o.drainer.Client, o.drainer.DryRunStrategy == cmdutil.DryRunServer)
...
}
}
}
...
}
return nil
}
...
func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface, serverDryRun bool) (error, error) {
client := clientset.CoreV1().Nodes()
oldData, err := json.Marshal(c.node)
// 更新 node Spec 的 Unschedulable 字段
c.node.Spec.Unschedulable = c.desired
newData, err := json.Marshal(c.node)
// merge 数据,通过 diff 然后获取
patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node)
if patchErr == nil {
...
_, err = client.Patch(context.TODO(), c.node.Name, types.StrategicMergePatchType, patchBytes, patchOptions)
}
...
}
Drain
在Cordon之后,去Drain。
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
...
cmd := &cobra.Command{
...
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.RunDrain())
},
}
...
直接o.RunDrain看,我们首先看到的是执行o.RunCordonOrUncordon,将节点标记为不可调度,所以我之前写的博客其实是不正确的,如果要让节点下线,那就执行kubectl drain。
func (o *DrainCmdOptions) RunDrain() error {
if err := o.RunCordonOrUncordon(true); err != nil {
return err
}
...
drainedNodes := sets.NewString()
var fatal error
for _, info := range o.nodeInfos {
// 驱逐 Pod
if err := o.deleteOrEvictPodsSimple(info); err == nil {
drainedNodes.Insert(info.Name)
printObj(info.Object, o.Out)
} else {
// 如果驱逐 Pod 失败,则显示对应的 Node 信息
if len(remainingNodes) > 0 {
fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
for _, nodeName := range remainingNodes {
fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
}
}
break
}
}
}
在deleteOrEvictPodsSimple中,首先通过 Node name 获取对应的 Pod 信息,然后执行 eviction 动作。
func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
...
if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
...
}
}
这里GetPodsForDeletion执行一个过滤器,其中包含以下场景的过滤器,需要注意的是,这里的过滤场景是有严格顺序的。
func (d *Helper) makeFilters() []podFilter {
return []podFilter{
// 被标记删除的 Pod(DeletionTimestamp 不为0)
d.skipDeletedFilter,
// 属于 DaemonSet 的 Pod
d.daemonSetFilter,
// mirror pod 其实就是 static pod,
// 是我们在 /etc/kubernetes/manifests/ 中定义的由 kubelet 负责生命周期管理的 Pod
// 在 `Annotations` 中会包含 `kubernetes.io/config.mirror`
d.mirrorPodFilter,
// 包含本地存储的 Pod,Pod 中的 Volume 字段不为空
d.localStorageFilter,
// 不属于 replicate 的 pod,`Controlled By` 不为空的 pod
d.unreplicatedFilter,
}
}
一旦获得过滤后的 Pod 列表,就会执行驱逐操作,从每个 Pod 的 goroutine 开始,并等待 Pod 驱逐完成。
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
returnCh := make(chan error, 1)
...
ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout)
defer cancel()
for _, pod := range pods {
go func(pod corev1.Pod, returnCh chan error) {
for {
...
select {
case <-ctx.Done():
// 驱逐超时
returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout)
return
default:
}
// 驱逐 Pod 动作,最终执行 d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction)
err := d.EvictPod(pod, policyGroupVersion)
...
}
...
params := waitForDeleteParams{
...
}
// 等待驱逐动作完成
_, err := waitForDelete(params)
if err == nil {
returnCh <- nil
} else {
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
}
}(pod, returnCh)
}
waitForDelete如果没有立即完成,将ConditionFunc进入循环,其中检测到 Pod 存在并且 ObjectMeta UID 已更改。WaitForConditionFunc
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
stopCh := make(chan struct{})
defer close(stopCh)
c := wait(stopCh)
for {
select {
case _, open := <-c:
ok, err := runConditionWithCrashProtection(fn)
if err != nil {
return err
}
if ok {
return nil
}
if !open {
return ErrWaitTimeout
}
case <-done:
return ErrWaitTimeout
}
}
}
概括
该kubectl drain命令的实现非常简单,没有特别复杂的逻辑。K8s 能够做到这一点的一个重要原因是所有的动作都是声明性的,不需要在声明后等待执行完成后主动做脏事。在 Pod 驱逐的情况下,并非所有 Pod 都会被驱逐到其他节点,因此需要特别注意在节点下线之前检查是否有简单的 Pod 资源仍在运行,或者是否有任何 Pod 使用本地存储,或类似的。
我已经看到这种设计模式的组合有一段时间了,我正在寻找机会重新学习它们。
