CSI如何运作的简要分析
最近一直在做CSI相关的工作,在开发的过程中,我觉得CSI的细节是相当繁琐的。通过整理CSI工作流程,可以加深对CSI的理解,与大家分享我对CSI的认识。
我将通过两篇文章介绍 CSI,第一篇将重点介绍 CSI 的基本组件和工作原理,基于 Kubernetes 作为 CSI 的 CO(Container Orchestration Systems)。第二篇将选取几个典型的CSI项目,分析具体实现。
CSI的基本组成部分
CSI 云提供商有两种类型,一种是树内类型,一种是树外类型。前者是运行在k8s核心组件内部的存储插件;后者是一个独立于 k8s 组件运行的存储插件。本文重点介绍树外类型的插件。
out-of-tree 类型的插件通过 gRPC 接口与 k8s 组件交互,k8s 提供了多个 SideCar 组件与 CSI 插件配合实现丰富的功能。对于out-of-tree插件,使用的组件分为SideCar组件和需要第三方实现的插件。
SideCar components
external-attacher

侦听 VolumeAttachment 对象并调用 CSI 驱动程序控制器服务的ControllerPublishVolume和ControllerUnpublishVolume接口以将卷附加到节点或从节点中删除卷。
如果存储系统需要附加/分离步骤,则需要此组件,因为 K8s 内部附加/分离控制器不直接调用 CSI 驱动程序接口。
external-provisioner

前提是PVC中指定的StorageClass的provisioner字段与CSI驱动Controller服务的CreateVolume和接口的返回值相同。身份服务DeleteVolume的接口返回相同的值。GetPluginInfo一旦配置了新卷,K8s 将创建相应的 PV。
如果PVC绑定的PV的回收策略是delete,则external-provisioner组件监听PVC的删除,并调用DeleteVolumeCSI驱动Controller服务的接口。一旦卷删除成功,组件也会删除对应的 PV。
该组件还支持从快照创建数据源。如果 PVC 中指定了 Snapshot CRD 数据源,则组件SnapshotContent在调用CreateVolume接口时会通过对象获取快照的相关信息并将该内容传递给 CSI 驱动程序,CSI 驱动程序需要根据数据创建卷源快照。
external-resizer

监听 PVC 对象,如果用户在 PVC 对象上请求更多存储,组件调用NodeExpandVolumeCSI 驱动控制器服务的接口,用于扩展卷。
external-snapshotter

该组件与 Snapshot Controller 结合使用,后者根据集群中创建的 Snapshot 对象创建相应的 VolumeSnapshotContent,以及监听 VolumeSnapshotContent 对象的 external-snapshotter。当监听到 VolumeSnapshotContent 时,将相应的参数传递给 CSI 驱动 Controller 服务通过CreateSnapshotRequest调用其CreateSnapshot接口。该组件还负责调用DeleteSnapshot和ListSnapshots接口。
livenessprobe

负责监控 CSI driver 的健康状况并通过 Liveness Probe 机制上报给 k8s,并在 CSI driver 检测到异常时重启 pod。
node-driver-registrar

通过直接调用NodeGetInfoCSI驱动Node服务的接口,通过kubelet的插件注册机制,将CSI驱动信息注册到对应节点的kubelet上。
external-health-monitor-controller

在 PVC 事件中,通过调用CSI 驱动控制器服务的ListVolumes或接口来检查和报告 CSI 卷的健康状况。ControllerGetVolume
external-health-monitor-agent

通过调用NodeGetVolumeStatsCSI driver Node 服务的接口检查 CSI 卷的健康状况,并在 pod 的事件中上报。
第三方插件
第三方存储提供者(即SP,Storage Provider)需要实现两个插件,Controller负责卷管理,部署为StatefulSet,Node负责将卷挂载到pod中,是在每个节点中部署为 DaemonSet。
CSI 插件通过 Unix Domani Socket gRPC 与 kubelet 和 k8s 外部组件交互,gRPC 定义了 SP 需要实现的三组 RPC 接口,以便与 k8s 外部组件进行通信。三组接口分别是CSI Identity、CSI Controller、CSI Node,下面详细定义。
CSI 身份
CSI Identity用于提供CSI驱动的身份信息,需要Controller和Node共同实现。接口如下。
service Identity {
rpc GetPluginInfo(GetPluginInfoRequest)
returns (GetPluginInfoResponse) {}
rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
returns (GetPluginCapabilitiesResponse) {}
rpc Probe (ProbeRequest)
returns (ProbeResponse) {}
}
GetPluginInfo是强制的,node-driver-registrar组件会调用该接口将CSI驱动注册到kubelet;GetPluginCapabilities用于表示CSI驱动主要提供哪些功能。
CSI 控制器
用于创建/删除卷、附加/分离卷、卷快照、卷扩展/收缩等。Controller插件需要实现这组接口。接口如下。
service Controller {
rpc CreateVolume (CreateVolumeRequest)
returns (CreateVolumeResponse) {}
rpc DeleteVolume (DeleteVolumeRequest)
returns (DeleteVolumeResponse) {}
rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
returns (ControllerPublishVolumeResponse) {}
rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
returns (ControllerUnpublishVolumeResponse) {}
rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse) {}
rpc ListVolumes (ListVolumesRequest)
returns (ListVolumesResponse) {}
rpc GetCapacity (GetCapacityRequest)
returns (GetCapacityResponse) {}
rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
returns (ControllerGetCapabilitiesResponse) {}
rpc CreateSnapshot (CreateSnapshotRequest)
returns (CreateSnapshotResponse) {}
rpc DeleteSnapshot (DeleteSnapshotRequest)
returns (DeleteSnapshotResponse) {}
rpc ListSnapshots (ListSnapshotsRequest)
returns (ListSnapshotsResponse) {}
rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
returns (ControllerExpandVolumeResponse) {}
rpc ControllerGetVolume (ControllerGetVolumeRequest)
returns (ControllerGetVolumeResponse) {
option (alpha_method) = true;
}
}
上面在k8s外部组件的介绍中提到,不同的组件针对不同的功能提供不同的接口。例如,CreateVolume/DeleteVolume可以与 external-provisioner 一起使用来创建/删除卷,ControllerPublishVolume/ControllerUnpublishVolume可以与 external-attacher 一起使用来附加/分离卷等。
CSI 节点
Node插件需要实现这一套接口,用于挂载/卸载卷、检查卷状态等,接口如下。
service Node {
rpc NodeStageVolume (NodeStageVolumeRequest)
returns (NodeStageVolumeResponse) {}
rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
returns (NodeUnstageVolumeResponse) {}
rpc NodePublishVolume (NodePublishVolumeRequest)
returns (NodePublishVolumeResponse) {}
rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
returns (NodeUnpublishVolumeResponse) {}
rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
returns (NodeGetVolumeStatsResponse) {}
rpc NodeExpandVolume(NodeExpandVolumeRequest)
returns (NodeExpandVolumeResponse) {}
rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
returns (NodeGetCapabilitiesResponse) {}
rpc NodeGetInfo (NodeGetInfoRequest)
returns (NodeGetInfoResponse) {}
}
NodeStageVolume用于实现多个pod共享一个volume的功能,支持先将volume挂载到临时目录,再通过 挂载到pod NodePublishVolume;NodeUnstageVolume相反。
工作流程
下面看一下 Pod 安装卷的整个工作流程。该过程分为三个阶段:Provision/Delete、Attach/Detach 和 Mount/Unmount,但并非每个存储解决方案都会经历这三个阶段,例如 NFS 没有 Attach/Detach 阶段。
整个过程不仅涉及到上述组件的工作,还涉及到ControllerManager和kubelet的AttachDetachController和PVController组件,下面分别对Provision、Attach、Mount阶段进行详细分析。
Provision

我们先看Provision阶段,整个过程如上图所示。extenal-provisioner 和 PVController 都监视 PVC 资源。
- 当 PVController 观察到集群中创建了一个 PVC 时,它会判断是否存在与之匹配的 in-tree 插件,如果没有,则判断其存储类型为 out-of-tree,因此将 PVC 标注为
volume.beta.kubernetes.io/storage- provisioner={csi driver name}. CreateVolume当 extenal-provisioner 观察到 PVC 的注释 csi 驱动程序与它自己的 csi 驱动程序一致时,调用 CSI 控制器的接口。- 当
CreateVolumeCSI Controller 的接口返回成功时,extenal-provisioner 在集群中创建对应的 PV。 - 当 PVController 观察到集群中创建了 PV 时,它会将 PV 绑定到 PVC。
Attach

Attach阶段是指将一个卷附加到一个节点上,整个过程如上图所示。
- ADController 在 pod 被分派到节点并且正在使用 CSI 类型的 PV 时进行侦听,并调用内部树内 CSI 插件接口,该接口在集群中创建 VolumeAttachment 资源。
- external-attacher 组件监视要创建的 VolumeAttachment 资源并调用 CSI 控制器的
ControllerPublishVolume接口。 - 当
ControllerPublishVolume成功调用 CSI Controller 的接口时,external-attacher 将相应 VolumeAttachment 对象的 Attached 状态设置为 true。 - 当 ADController 观察到 VolumeAttachment 对象的 Attached 状态为真时,它会更新 ADController ActualStateOfWorld 的内部状态。
Mount

将卷挂载到 Pod 的最后一步涉及到 kubelet,整个过程简单描述为对应节点上的 kubelet 在创建 Pod 的过程中调用 CSI Node 插件进行挂载操作。以下是 kubelet 内部组件的细分。
首先,在syncPodkubelet 创建 pod 的 main 函数中,kubelet 调用WaitForAttachAndMount其子组件 volumeManager 的方法,等待卷挂载完成。
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
}
...
}
volumeManager 包含两个组件:desiredStateOfWorldPopulator 和 reconciler,它们共同完成 Pod 中卷的挂载和卸载过程。整个过程如下。

desiredStateOfWorldPopulator 和 reconciler 具有生产者-消费者模型。在volumeManager中维护了两个队列(技术上是一个接口,但在这里充当一个队列),分别是DesiredStateOfWorld和前者维护当前节点中volume的期望状态;后者维护当前节点中卷的实际状态。
desiredStateOfWorldPopulator 在自己的循环中只做了两件事,一是从 kubelet 的 podManager 中获取新创建的 Pod 并将其需要挂载的卷的信息记录到 DesiredStateOfWorld 中;另一件事是从当前节点的 podManager 中获取正在挂载的卷的信息到 DesiredStateOfWorld 中。另一件事是从当前节点的 podManager 中获取已删除的 pod,并检查它们的卷是否在 ActualStateOfWorld 记录中,如果没有,则将它们从 DesiredStateOfWorld 中删除,从而确保 DesiredStateOfWorld 记录所有卷的期望状态在节点中。这可确保 DesiredStateOfWorld 记录节点中所有卷的所需状态。相关代码如下(为了精简逻辑,去掉了部分代码)。
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Map unique pod name to outer volume name to MountedVolume.
mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
...
processedVolumesForFSResize := sets.NewString()
for _, pod := range dswp.podManager.GetPods() {
dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
}
}
// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
pod *v1.Pod,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
uniquePodName := util.GetUniquePodName(pod)
...
for _, podVolume := range pod.Spec.Volumes {
pvc, volumeSpec, volumeGidValue, err :=
dswp.createVolumeSpec(podVolume, pod, mounts, devices)
// Add volume to desired state of world
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
}
}
协调者是消费者,它主要做三件事。
unmountVolumes(): 遍历ActualStateOfWorld中的volume,判断是否在DesiredStateOfWorld中,如果不是,则调用CSI Node的接口进行unmount,并记录在ActualStateOfWorld中。mountAttachVolumes():从DesiredStateOfWorld中获取需要挂载的volume,调用CSI Node的接口进行挂载或扩容,并记录在ActualStateOfWorld中。unmountDetachDevices(): 遍历ActualStateOfWorld中的volume,如果已经被attach了,但是没有pod在使用并且没有记录在DesiredStateOfWorld中,那么unmount/detach它。
我们以mountAttachVolumes()一个例子来看看它是如何调用CSI Node接口的。
func (rc *reconciler) mountAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
...
} else if !volMounted || cache.IsRemountRequiredError(err) {
// Volume is not mounted, or is already mounted, but requires remounting
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
...
} else if cache.IsFSResizeRequiredError(err) {
err := rc.operationExecutor.ExpandInUseVolume(
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
...
}
}
}
mount 的执行是在 中完成的rc.operationExecutor,看 operationExecutor 代码。
func (oe *operationExecutor) MountVolume(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) error {
...
var generatedOperations volumetypes.GeneratedOperations
generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
// Avoid executing mount/map from multiple pods referencing the
// same volume in parallel
podName := nestedpendingoperations.EmptyUniquePodName
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}
这个函数先构造executor函数,然后执行,再看构造函数。
func (og *operationGenerator) GenerateMountVolumeFunc(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) volumetypes.GeneratedOperations {
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
mountVolumeFunc := func() volumetypes.OperationContext {
// Get mounter plugin
volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
volume.VolumeOptions{})
...
// Execute mount
mountErr := volumeMounter.SetUp(volume.MounterArgs{
FsUser: util.FsUserFrom(volumeToMount.Pod),
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
FSGroupChangePolicy: fsGroupChangePolicy,
})
// Update actual state of world
markOpts := MarkVolumeOpts{
PodName: volumeToMount.PodName,
PodUID: volumeToMount.Pod.UID,
VolumeName: volumeToMount.VolumeName,
Mounter: volumeMounter,
OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
VolumeGidVolume: volumeToMount.VolumeGidValue,
VolumeSpec: volumeToMount.VolumeSpec,
VolumeMountState: VolumeMounted,
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
...
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
OperationName: "volume_mount",
OperationFunc: mountVolumeFunc,
EventRecorderFunc: eventRecorderFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
}
}
这里我们先去注册到kubelet的CSI的插件列表,找到对应的插件,然后执行volumeMounter.SetUp,最后更新ActualStateOfWorld记录。这里,外部CSI插件为csiMountMgr,代码如下。
func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
return c.SetUpAt(c.GetPath(), mounterArgs)
}
func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
csi, err := c.csiClientGetter.Get()
...
err = csi.NodePublishVolume(
ctx,
volumeHandle,
readOnly,
deviceMountPath,
dir,
accessMode,
publishContext,
volAttribs,
nodePublishSecrets,
fsType,
mountOptions,
)
...
return nil
}
可以看到,CSI Node NodePublishVolume/ NodeUnPublishVolumeinterface是由kubelet中volumeManager的csiMountMgr调用的。
概括
本文从三个方面来分析整个 CSI 系统:CSI 组件,CSI 接口,以及 Volume 挂载到 Pod 的过程。
CSI 是整个容器生态系统的标准存储接口,CO 通过 gRPC 与 CSI 插件通信。为了实现通用性,k8s 设计了很多外部组件来配合 CSI 插件实现不同的功能,从而保证了 k8s 内部逻辑的纯洁性和 CSI 插件的简洁性。
