Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk/snapshot: wait for cut #1225

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions pkg/disk/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ const (
DISK_RESIZE_PROCESSING_TIMEOUT = 30 * time.Second
)

var tenantUserUIDKey int

func GetTenantUserUID(ctx context.Context) string {
v := ctx.Value(&tenantUserUIDKey)
if v == nil {
return ""
}
return v.(string)
}

func WithTenantUserUID(ctx context.Context, tenantUserUID string) context.Context {
return context.WithValue(ctx, &tenantUserUIDKey, tenantUserUID)
}

// attach alibaba cloud disk
func attachDisk(ctx context.Context, tenantUserUID, diskID, nodeID string, isSharedDisk, isSingleInstance bool, fromNode bool) (string, error) {
klog.Infof("AttachDisk: Starting Do AttachDisk: DiskId: %s, InstanceId: %s, Region: %v", diskID, nodeID, GlobalConfigVar.Region)
Expand Down Expand Up @@ -728,25 +742,6 @@ func findDiskByName(name string, ecsClient cloud.ECSInterface) (*ecs.Disk, error
}
return &disks[0], err
}

func findSnapshotByName(name string) (*ecs.DescribeSnapshotsResponse, int, error) {
describeSnapShotRequest := ecs.CreateDescribeSnapshotsRequest()
describeSnapShotRequest.RegionId = GlobalConfigVar.Region
describeSnapShotRequest.SnapshotName = name
snapshots, err := GlobalConfigVar.EcsClient.DescribeSnapshots(describeSnapShotRequest)
if err != nil {
return nil, 0, err
}
if len(snapshots.Snapshots.Snapshot) == 0 {
return snapshots, 0, nil
}

if len(snapshots.Snapshots.Snapshot) > 1 {
return snapshots, len(snapshots.Snapshots.Snapshot), status.Error(codes.Internal, "find more than one snapshot with name "+name)
}
return snapshots, 1, nil
}

func findDiskSnapshotByID(id string) (*ecs.Snapshot, error) {
describeSnapShotRequest := ecs.CreateDescribeSnapshotsRequest()
describeSnapShotRequest.RegionId = GlobalConfigVar.Region
Expand Down Expand Up @@ -845,7 +840,16 @@ func requestAndCreateSnapshot(ecsClient *ecs.Client, params *createSnapshotParam
// Do Snapshot create
snapshotResponse, err := ecsClient.CreateSnapshot(createSnapshotRequest)
if err != nil {
return nil, fmt.Errorf("create snapshot %s failed: %v", params.SnapshotName, err)
var aliErr *alicloudErr.ServerError
if errors.As(err, &aliErr) {
switch aliErr.ErrorCode() {
case IdempotentParameterMismatch:
return nil, status.Errorf(codes.AlreadyExists, "already created but parameter mismatch (RequestID: %s)", aliErr.RequestId())
case QuotaExceed_Snapshot:
return nil, status.Errorf(codes.ResourceExhausted, "snapshot quota exceeded: %v", err)
}
}
return nil, status.Errorf(codes.Internal, "create snapshot failed: %v", err)
}
return snapshotResponse, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/disk/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const (
SnapshotNotFound = "InvalidSnapshotId.NotFound"
InstanceNotFound = "InvalidInstanceId.NotFound"

QuotaExceed_Snapshot = "QuotaExceed.Snapshot"

// DiskHighAvail tag
DiskHighAvail = "available"
// MBSIZE tag
Expand Down
100 changes: 46 additions & 54 deletions pkg/disk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"google.golang.org/grpc/codes"
Expand All @@ -42,11 +43,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

// controller server try to create/delete volumes/snapshots
type controllerServer struct {
recorder record.EventRecorder
recorder record.EventRecorder
snapshotWaiter waitstatus.StatusWaiter[ecs.Snapshot]
common.GenericControllerServer
}

Expand Down Expand Up @@ -80,10 +83,38 @@ var veasp = struct {

var delVolumeSnap sync.Map

func newSnapshotStatusWaiter() waitstatus.StatusWaiter[ecs.Snapshot] {
client := waitstatus.DescribeSnapshots{
Client: GlobalConfigVar.EcsClient,
}
if GlobalConfigVar.DiskMultiTenantEnable {
waiter := waitstatus.NewSimple(client, clock.RealClock{})
waiter.ClientFactory = func(ctx context.Context) (waitstatus.ECSDescribeResources[ecs.Snapshot], error) {
tenantUserUID := GetTenantUserUID(ctx)
ecsClient, err := getEcsClientByID("", tenantUserUID)
if err != nil {
return nil, err
}
return waitstatus.DescribeSnapshots{Client: ecsClient}, nil
}
return waiter
} else {
waiter := waitstatus.NewBatched(client, clock.RealClock{})
waiter.PollHook = func() waitstatus.ECSDescribeResources[ecs.Snapshot] {
return waitstatus.DescribeSnapshots{
Client: updateEcsClient(GlobalConfigVar.EcsClient),
}
}
go waiter.Run(context.Background())
return waiter
}
}

// NewControllerServer is to create controller server
func NewControllerServer() csi.ControllerServer {
c := &controllerServer{
recorder: utils.NewEventRecorder(),
recorder: utils.NewEventRecorder(),
snapshotWaiter: newSnapshotStatusWaiter(),
}
return c
}
Expand All @@ -100,9 +131,6 @@ func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *
}, nil
}

// the map of req.Name and csi.Snapshot
var createdSnapshotMap = map[string]*csi.Snapshot{}

// the map of multizone and index
var storageClassZonePos = map[string]int{}

Expand Down Expand Up @@ -523,42 +551,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS

klog.Infof("CreateSnapshot:: Starting to create snapshot: %+v", req)
sourceVolumeID := strings.Trim(req.GetSourceVolumeId(), " ")
// Need to check for already existing snapshot name
GlobalConfigVar.EcsClient = updateEcsClient(GlobalConfigVar.EcsClient)
snapshots, snapNum, err := findSnapshotByName(req.GetName())
switch {
case snapNum == 1:
// Since err is nil, it means the snapshot with the same name already exists need
// to check if the sourceVolumeId of existing snapshot is the same as in new request.
existsSnapshot := snapshots.Snapshots.Snapshot[0]
if existsSnapshot.SourceDiskId == req.GetSourceVolumeId() {
csiSnapshot, err := formatCSISnapshot(&existsSnapshot)
if err != nil {
return nil, status.Errorf(codes.Internal, "format snapshot failed: %v", err)
}
klog.Infof("CreateSnapshot:: Snapshot already created: name[%s], sourceId[%s], status[%v]", req.Name, req.GetSourceVolumeId(), csiSnapshot.ReadyToUse)
if csiSnapshot.ReadyToUse {
klog.Infof("VolumeSnapshot: name: %s, id: %s is ready to use.", existsSnapshot.SnapshotName, existsSnapshot.SnapshotId)
}
return &csi.CreateSnapshotResponse{
Snapshot: csiSnapshot,
}, nil
}
return nil, status.Errorf(codes.AlreadyExists, "snapshot with the same name: %s but with different SourceVolumeId already exist", req.GetName())
case snapNum > 1:
return nil, status.Errorf(codes.Internal, "CreateSnapshot: get snapshot %s more than 1 instance", req.Name)
case err != nil:
return nil, status.Errorf(codes.Internal, "CreateSnapshot: get snapshot %s with error: %s", req.GetName(), err.Error())
}

// check snapshot again, if ram has no auth to describe snapshot, there will always 0 response.
if value, ok := createdSnapshotMap[req.Name]; ok {
str := fmt.Sprintf("CreateSnapshot:: Snapshot already created, Name: %s, Info: %v", req.Name, value)
klog.Info(str)
return &csi.CreateSnapshotResponse{
Snapshot: value,
}, nil
}

ecsClient, err := getEcsClientByID(sourceVolumeID, "")
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -571,28 +564,28 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}

// init createSnapshotRequest and parameters
createAt := timestamppb.Now()
params.SourceVolumeID = sourceVolumeID
params.SnapshotName = req.Name
snapshotResponse, err := requestAndCreateSnapshot(ecsClient, params)

if err != nil {
return nil, status.Errorf(codes.Internal, "create snapshot[%s] with sourceId[%s] failed with error: %v", req.Name, req.GetSourceVolumeId(), err)
return nil, err
}

str := fmt.Sprintf("CreateSnapshot:: Snapshot create successful: snapshotName[%s], sourceId[%s], snapshotId[%s]", req.Name, req.GetSourceVolumeId(), snapshotResponse.SnapshotId)
klog.Infof(str)
csiSnapshot := &csi.Snapshot{
SnapshotId: snapshotResponse.SnapshotId,
SourceVolumeId: sourceVolumeID,
CreationTime: createAt,
ReadyToUse: false, // even if instant access is available, the snapshot is not ready to use immediately
SizeBytes: utils.Gi2Bytes(int64(disks[0].Size)),
klog.Infof("CreateSnapshot:: Snapshot create successful: snapshotName[%s], sourceId[%s], snapshotId[%s]", req.Name, req.GetSourceVolumeId(), snapshotResponse.SnapshotId)

snap, err := cs.snapshotWaiter.WaitFor(ctx, snapshotResponse.SnapshotId, waitstatus.SnapshotCut)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed while waiting for snapshot cut: %v", err)
}

csiSnap, err := formatCSISnapshot(snap)
if err != nil {
return nil, err
}

createdSnapshotMap[req.Name] = csiSnapshot
return &csi.CreateSnapshotResponse{
Snapshot: csiSnapshot,
Snapshot: csiSnap,
}, nil
}

Expand Down Expand Up @@ -665,7 +658,6 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return nil, status.Errorf(codes.Internal, "delete snapshot %s with RequestId: %s, error: %v", snapshotID, reqId, err)
}

delete(createdSnapshotMap, snapshot.SnapshotName)
klog.Infof("DeleteSnapshot:: Successfully delete snapshot %s, requestId: %s", snapshotID, reqId)
return &csi.DeleteSnapshotResponse{}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/disk/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ type nodeServer struct {
common.GenericNodeServer
}

// Disk status returned in ecs.DescribeDisks
const (
// DiskStatusInuse disk inuse status
DiskStatusInuse = "In_use"
// DiskStatusAttaching disk attaching status
DiskStatusInuse = "In_use"
DiskStatusAttaching = "Attaching"
// DiskStatusAvailable disk available status
DiskStatusDetaching = "Detaching"
DiskStatusAvailable = "Available"
)

const (
// DiskStatusAttached disk attached status
DiskStatusAttached = "attached"
// DiskStatusDetached disk detached status
Expand Down
Loading