CSI如何运作的简要分析

最近一直在做CSI相关的工作,在开发的过程中,我觉得CSI的细节是相当繁琐的。通过整理CSI工作流程,可以加深对CSI的理解,与大家分享我对CSI的认识。

我将通过两篇文章介绍 CSI,第一篇将重点介绍 CSI 的基本组件和工作原理,基于 Kubernetes 作为 CSI 的 CO(Container Orchestration Systems)。第二篇将选取几个典型的CSI项目,分析具体实现。

 

CSI的基本组成部分

CSI 云提供商有两种类型,一种是树内类型,一种是树外类型。前者是运行在k8s核心组件内部的存储插件;后者是一个独立于 k8s 组件运行的存储插件。本文重点介绍树外类型的插件。

out-of-tree 类型的插件通过 gRPC 接口与 k8s 组件交互,k8s 提供了多个 SideCar 组件与 CSI 插件配合实现丰富的功能。对于out-of-tree插件,使用的组件分为SideCar组件和需要第三方实现的插件。

 

SideCar components

external-attacher

侦听 VolumeAttachment 对象并调用 CSI 驱动程序控制器服务的ControllerPublishVolumeControllerUnpublishVolume接口以将卷附加到节点或从节点中删除卷。

如果存储系统需要附加/分离步骤,则需要此组件,因为 K8s 内部附加/分离控制器不直接调用 CSI 驱动程序接口。

 

 

external-provisioner

前提是PVC中指定的StorageClass的provisioner字段与CSI驱动Controller服务的CreateVolume和接口的返回值相同。身份服务DeleteVolume的接口返回相同的值。GetPluginInfo一旦配置了新卷,K8s 将创建相应的 PV。

如果PVC绑定的PV的回收策略是delete,则external-provisioner组件监听PVC的删除,并调用DeleteVolumeCSI驱动Controller服务的接口。一旦卷删除成功,组件也会删除对应的 PV。

 

该组件还支持从快照创建数据源。如果 PVC 中指定了 Snapshot CRD 数据源,则组件SnapshotContent在调用CreateVolume接口时会通过对象获取快照的相关信息并将该内容传递给 CSI 驱动程序,CSI 驱动程序需要根据数据创建卷源快照。

 

external-resizer

监听 PVC 对象,如果用户在 PVC 对象上请求更多存储,组件调用NodeExpandVolumeCSI 驱动控制器服务的接口,用于扩展卷。

 

external-snapshotter

该组件与 Snapshot Controller 结合使用,后者根据集群中创建的 Snapshot 对象创建相应的 VolumeSnapshotContent,以及监听 VolumeSnapshotContent 对象的 external-snapshotter。当监听到 VolumeSnapshotContent 时,将相应的参数传递给 CSI 驱动 Controller 服务通过CreateSnapshotRequest调用其CreateSnapshot接口。该组件还负责调用DeleteSnapshotListSnapshots接口。

 

livenessprobe

负责监控 CSI driver 的健康状况并通过 Liveness Probe 机制上报给 k8s,并在 CSI driver 检测到异常时重启 pod。

 

 

node-driver-registrar

通过直接调用NodeGetInfoCSI驱动Node服务的接口,通过kubelet的插件注册机制,将CSI驱动信息注册到对应节点的kubelet上。

 

 

external-health-monitor-controller

在 PVC 事件中,通过调用CSI 驱动控制器服务的ListVolumes或接口来检查和报告 CSI 卷的健康状况。ControllerGetVolume

 

 

external-health-monitor-agent

通过调用NodeGetVolumeStatsCSI driver Node 服务的接口检查 CSI 卷的健康状况,并在 pod 的事件中上报。

 

 

第三方插件

第三方存储提供者(即SP,Storage Provider)需要实现两个插件,Controller负责卷管理,部署为StatefulSet,Node负责将卷挂载到pod中,是在每个节点中部署为 DaemonSet。

CSI 插件通过 Unix Domani Socket gRPC 与 kubelet 和 k8s 外部组件交互,gRPC 定义了 SP 需要实现的三组 RPC 接口,以便与 k8s 外部组件进行通信。三组接口分别是CSI Identity、CSI Controller、CSI Node,下面详细定义。

 

 

CSI 身份

CSI Identity用于提供CSI驱动的身份信息,需要Controller和Node共同实现。接口如下。

service Identity {
  rpc GetPluginInfo(GetPluginInfoRequest)
    returns (GetPluginInfoResponse) {}

  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
    returns (GetPluginCapabilitiesResponse) {}

  rpc Probe (ProbeRequest)
    returns (ProbeResponse) {}
}

GetPluginInfo是强制的,node-driver-registrar组件会调用该接口将CSI驱动注册到kubelet;GetPluginCapabilities用于表示CSI驱动主要提供哪些功能。

 

CSI 控制器

用于创建/删除卷、附加/分离卷、卷快照、卷扩展/收缩等。Controller插件需要实现这组接口。接口如下。

service Controller {
  rpc CreateVolume (CreateVolumeRequest)
    returns (CreateVolumeResponse) {}

  rpc DeleteVolume (DeleteVolumeRequest)
    returns (DeleteVolumeResponse) {}

  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
    returns (ControllerPublishVolumeResponse) {}

  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
    returns (ControllerUnpublishVolumeResponse) {}

  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
    returns (ValidateVolumeCapabilitiesResponse) {}

  rpc ListVolumes (ListVolumesRequest)
    returns (ListVolumesResponse) {}

  rpc GetCapacity (GetCapacityRequest)
    returns (GetCapacityResponse) {}

  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
    returns (ControllerGetCapabilitiesResponse) {}

  rpc CreateSnapshot (CreateSnapshotRequest)
    returns (CreateSnapshotResponse) {}

  rpc DeleteSnapshot (DeleteSnapshotRequest)
    returns (DeleteSnapshotResponse) {}

  rpc ListSnapshots (ListSnapshotsRequest)
    returns (ListSnapshotsResponse) {}

  rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
    returns (ControllerExpandVolumeResponse) {}

  rpc ControllerGetVolume (ControllerGetVolumeRequest)
    returns (ControllerGetVolumeResponse) {
        option (alpha_method) = true;
    }
}

上面在k8s外部组件的介绍中提到,不同的组件针对不同的功能提供不同的接口。例如,CreateVolume/DeleteVolume可以与 external-provisioner 一起使用来创建/删除卷,ControllerPublishVolume/ControllerUnpublishVolume可以与 external-attacher 一起使用来附加/分离卷等。

 

CSI 节点

Node插件需要实现这一套接口,用于挂载/卸载卷、检查卷状态等,接口如下。

service Node {
  rpc NodeStageVolume (NodeStageVolumeRequest)
    returns (NodeStageVolumeResponse) {}

  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
    returns (NodeUnstageVolumeResponse) {}

  rpc NodePublishVolume (NodePublishVolumeRequest)
    returns (NodePublishVolumeResponse) {}

  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
    returns (NodeUnpublishVolumeResponse) {}

  rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
    returns (NodeGetVolumeStatsResponse) {}

  rpc NodeExpandVolume(NodeExpandVolumeRequest)
    returns (NodeExpandVolumeResponse) {}

  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
    returns (NodeGetCapabilitiesResponse) {}

  rpc NodeGetInfo (NodeGetInfoRequest)
    returns (NodeGetInfoResponse) {}
}

NodeStageVolume用于实现多个pod共享一个volume的功能,支持先将volume挂载到临时目录,再通过 挂载到pod NodePublishVolumeNodeUnstageVolume相反。

 

工作流程

下面看一下 Pod 安装卷的整个工作流程。该过程分为三个阶段:Provision/Delete、Attach/Detach 和 Mount/Unmount,但并非每个存储解决方案都会经历这三个阶段,例如 NFS 没有 Attach/Detach 阶段。

整个过程不仅涉及到上述组件的工作,还涉及到ControllerManager和kubelet的AttachDetachController和PVController组件,下面分别对Provision、Attach、Mount阶段进行详细分析。

 

 

Provision

我们先看Provision阶段,整个过程如上图所示。extenal-provisioner 和 PVController 都监视 PVC 资源。

  1. 当 PVController 观察到集群中创建了一个 PVC 时,它会判断是否存在与之匹配的 in-tree 插件,如果没有,则判断其存储类型为 out-of-tree,因此将 PVC 标注为volume.beta.kubernetes.io/storage- provisioner={csi driver name}.
  2. CreateVolume当 extenal-provisioner 观察到 PVC 的注释 csi 驱动程序与它自己的 csi 驱动程序一致时,调用 CSI 控制器的接口。
  3. CreateVolumeCSI Controller 的接口返回成功时,extenal-provisioner 在集群中创建对应的 PV。
  4. 当 PVController 观察到集群中创建了 PV 时,它会将 PV 绑定到 PVC。

 

Attach

Attach阶段是指将一个卷附加到一个节点上,整个过程如上图所示。

  1. ADController 在 pod 被分派到节点并且正在使用 CSI 类型的 PV 时进行侦听,并调用内部树内 CSI 插件接口,该接口在集群中创建 VolumeAttachment 资源。
  2. external-attacher 组件监视要创建的 VolumeAttachment 资源并调用 CSI 控制器的ControllerPublishVolume接口。
  3. ControllerPublishVolume成功调用 CSI Controller 的接口时,external-attacher 将相应 VolumeAttachment 对象的 Attached 状态设置为 true。
  4. 当 ADController 观察到 VolumeAttachment 对象的 Attached 状态为真时,它会更新 ADController ActualStateOfWorld 的内部状态。

 

 

Mount

将卷挂载到 Pod 的最后一步涉及到 kubelet,整个过程简单描述为对应节点上的 kubelet 在创建 Pod 的过程中调用 CSI Node 插件进行挂载操作。以下是 kubelet 内部组件的细分。

首先,在syncPodkubelet 创建 pod 的 main 函数中,kubelet 调用WaitForAttachAndMount其子组件 volumeManager 的方法,等待卷挂载完成。

func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
	// Volume manager will not mount volumes for terminated pods
	if !kl.podIsTerminated(pod) {
		// Wait for volumes to attach/mount
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
			klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
			return err
		}
	}
...
}

 

volumeManager 包含两个组件:desiredStateOfWorldPopulator 和 reconciler,它们共同完成 Pod 中卷的挂载和卸载过程。整个过程如下。

desiredStateOfWorldPopulator 和 reconciler 具有生产者-消费者模型。在volumeManager中维护了两个队列(技术上是一个接口,但在这里充当一个队列),分别是DesiredStateOfWorld和前者维护当前节点中volume的期望状态;后者维护当前节点中卷的实际状态。

desiredStateOfWorldPopulator 在自己的循环中只做了两件事,一是从 kubelet 的 podManager 中获取新创建的 Pod 并将其需要挂载的卷的信息记录到 DesiredStateOfWorld 中;另一件事是从当前节点的 podManager 中获取正在挂载的卷的信息到 DesiredStateOfWorld 中。另一件事是从当前节点的 podManager 中获取已删除的 pod,并检查它们的卷是否在 ActualStateOfWorld 记录中,如果没有,则将它们从 DesiredStateOfWorld 中删除,从而确保 DesiredStateOfWorld 记录所有卷的期望状态在节点中。这可确保 DesiredStateOfWorld 记录节点中所有卷的所需状态。相关代码如下(为了精简逻辑,去掉了部分代码)。

// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
	// Map unique pod name to outer volume name to MountedVolume.
	mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
	...
	processedVolumesForFSResize := sets.NewString()
	for _, pod := range dswp.podManager.GetPods() {
		dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
	}
}

// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
	pod *v1.Pod,
	mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
	processedVolumesForFSResize sets.String) {
	uniquePodName := util.GetUniquePodName(pod)
    ...
	for _, podVolume := range pod.Spec.Volumes {   
		pvc, volumeSpec, volumeGidValue, err :=
			dswp.createVolumeSpec(podVolume, pod, mounts, devices)

		// Add volume to desired state of world
		_, err = dswp.desiredStateOfWorld.AddPodToVolume(
			uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
		dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
    }
}

 

协调者是消费者,它主要做三件事。

  1. unmountVolumes(): 遍历ActualStateOfWorld中的volume,判断是否在DesiredStateOfWorld中,如果不是,则调用CSI Node的接口进行unmount,并记录在ActualStateOfWorld中。
  2. mountAttachVolumes():从DesiredStateOfWorld中获取需要挂载的volume,调用CSI Node的接口进行挂载或扩容,并记录在ActualStateOfWorld中。
  3. unmountDetachDevices(): 遍历ActualStateOfWorld中的volume,如果已经被attach了,但是没有pod在使用并且没有记录在DesiredStateOfWorld中,那么unmount/detach它。

我们以mountAttachVolumes()一个例子来看看它是如何调用CSI Node接口的。

func (rc *reconciler) mountAttachVolumes() {
	// Ensure volumes that should be attached/mounted are attached/mounted.
	for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
		volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
		volumeToMount.DevicePath = devicePath
		if cache.IsVolumeNotAttachedError(err) {
			...
		} else if !volMounted || cache.IsRemountRequiredError(err) {
			// Volume is not mounted, or is already mounted, but requires remounting
			err := rc.operationExecutor.MountVolume(
				rc.waitForAttachTimeout,
				volumeToMount.VolumeToMount,
				rc.actualStateOfWorld,
				isRemount)
			...
		} else if cache.IsFSResizeRequiredError(err) {
			err := rc.operationExecutor.ExpandInUseVolume(
				volumeToMount.VolumeToMount,
				rc.actualStateOfWorld)
			...
		}
	}
}

 

mount 的执行是在 中完成的rc.operationExecutor,看 operationExecutor 代码。

func (oe *operationExecutor) MountVolume(
	waitForAttachTimeout time.Duration,
	volumeToMount VolumeToMount,
	actualStateOfWorld ActualStateOfWorldMounterUpdater,
	isRemount bool) error {
	...
	var generatedOperations volumetypes.GeneratedOperations
		generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
			waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)

	// Avoid executing mount/map from multiple pods referencing the
	// same volume in parallel
	podName := nestedpendingoperations.EmptyUniquePodName

	return oe.pendingOperations.Run(
		volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}

 

这个函数先构造executor函数,然后执行,再看构造函数。

func (og *operationGenerator) GenerateMountVolumeFunc(
	waitForAttachTimeout time.Duration,
	volumeToMount VolumeToMount,
	actualStateOfWorld ActualStateOfWorldMounterUpdater,
	isRemount bool) volumetypes.GeneratedOperations {

	volumePlugin, err :=
		og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)

	mountVolumeFunc := func() volumetypes.OperationContext {
		// Get mounter plugin
		volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
		volumeMounter, newMounterErr := volumePlugin.NewMounter(
			volumeToMount.VolumeSpec,
			volumeToMount.Pod,
			volume.VolumeOptions{})
		...
		// Execute mount
		mountErr := volumeMounter.SetUp(volume.MounterArgs{
			FsUser:              util.FsUserFrom(volumeToMount.Pod),
			FsGroup:             fsGroup,
			DesiredSize:         volumeToMount.DesiredSizeLimit,
			FSGroupChangePolicy: fsGroupChangePolicy,
		})
		// Update actual state of world
		markOpts := MarkVolumeOpts{
			PodName:             volumeToMount.PodName,
			PodUID:              volumeToMount.Pod.UID,
			VolumeName:          volumeToMount.VolumeName,
			Mounter:             volumeMounter,
			OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
			VolumeGidVolume:     volumeToMount.VolumeGidValue,
			VolumeSpec:          volumeToMount.VolumeSpec,
			VolumeMountState:    VolumeMounted,
		}

		markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
		...
		return volumetypes.NewOperationContext(nil, nil, migrated)
	}

	return volumetypes.GeneratedOperations{
		OperationName:     "volume_mount",
		OperationFunc:     mountVolumeFunc,
		EventRecorderFunc: eventRecorderFunc,
		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
	}
}

这里我们先去注册到kubelet的CSI的插件列表,找到对应的插件,然后执行volumeMounter.SetUp,最后更新ActualStateOfWorld记录。这里,外部CSI插件为csiMountMgr,代码如下。

func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
	return c.SetUpAt(c.GetPath(), mounterArgs)
}

func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
	csi, err := c.csiClientGetter.Get()
	...

	err = csi.NodePublishVolume(
		ctx,
		volumeHandle,
		readOnly,
		deviceMountPath,
		dir,
		accessMode,
		publishContext,
		volAttribs,
		nodePublishSecrets,
		fsType,
		mountOptions,
	)
    ...
	return nil
}

 

可以看到,CSI Node NodePublishVolumeNodeUnPublishVolumeinterface是由kubelet中volumeManager的csiMountMgr调用的。

概括

本文从三个方面来分析整个 CSI 系统:CSI 组件,CSI 接口,以及 Volume 挂载到 Pod 的过程。

CSI 是整个容器生态系统的标准存储接口,CO 通过 gRPC 与 CSI 插件通信。为了实现通用性,k8s 设计了很多外部组件来配合 CSI 插件实现不同的功能,从而保证了 k8s 内部逻辑的纯洁性和 CSI 插件的简洁性。

 

发表评论