Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DONTMERGE]feat: add vmss and vm etag support for track1 SDK #7698

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
29 changes: 28 additions & 1 deletion pkg/azureclients/armclient/azure_armclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package armclient
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"html"
"net"
Expand Down Expand Up @@ -430,6 +431,16 @@ func (c *Client) waitAsync(ctx context.Context, futures map[string]*azure.Future

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse {
return c.PutResourcesInBatchesBase(ctx, resources, batchSize, false)
}

// PutResourcesInBatchesWithEtag is similar with PutResources, but it sends sync request concurrently in batches with Etag header when Etag field is not empty.
func (c *Client) PutResourcesInBatchesWithEtag(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse {
return c.PutResourcesInBatchesBase(ctx, resources, batchSize, true)
}

// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
func (c *Client) PutResourcesInBatchesBase(ctx context.Context, resources map[string]interface{}, batchSize int, enableEtag bool) map[string]*PutResourcesResponse {
if len(resources) == 0 {
return nil
}
Expand Down Expand Up @@ -458,7 +469,23 @@ func (c *Client) PutResourcesInBatches(ctx context.Context, resources map[string
go func(resourceID string, parameters interface{}) {
defer wg.Done()
defer func() { <-rateLimiter }()
future, rerr := c.PutResourceAsync(ctx, resourceID, parameters)
decorators := []autorest.PrepareDecorator{}
if enableEtag {
type etagPlaceholder struct {
Etag *string `json:"etag,omitempty"`
}

p := &etagPlaceholder{}
b, err := json.Marshal(parameters)
if err == nil {
err = json.Unmarshal(b, &p)
if err == nil && p.Etag != nil {
decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(*p.Etag)))
}
}
}

future, rerr := c.PutResourceAsync(ctx, resourceID, parameters, decorators...)
if rerr != nil {
responseLock.Lock()
responses[resourceID] = &PutResourcesResponse{
Expand Down
3 changes: 3 additions & 0 deletions pkg/azureclients/armclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Interface interface {
// PutResourcesInBatches is similar with PutResources, but it sends sync request concurrently in batches.
PutResourcesInBatches(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse

// PutResourcesInBatchesWithEtag is similar with PutResources, but it sends sync request concurrently in batches with Etag header when Etag field is not empty.
PutResourcesInBatchesWithEtag(ctx context.Context, resources map[string]interface{}, batchSize int) map[string]*PutResourcesResponse

// PatchResource patches a resource by resource ID
PatchResource(ctx context.Context, resourceID string, parameters interface{}, decorators ...autorest.PrepareDecorator) (*http.Response, *retry.Error)

Expand Down
14 changes: 14 additions & 0 deletions pkg/azureclients/armclient/mockarmclient/interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 32 additions & 21 deletions pkg/azureclients/vmssclient/azure_vmssclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ func New(config *azclients.ClientConfig) *Client {
}

// Get gets a VirtualMachineScaleSet.
func (c *Client) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (compute.VirtualMachineScaleSet, *retry.Error) {
func (c *Client) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (VirtualMachineScaleSet, *retry.Error) {
mc := metrics.NewMetricContext("vmss", "get", resourceGroupName, c.subscriptionID, "")

// Report errors if the client is rate limited.
if !c.rateLimiterReader.TryAccept() {
mc.RateLimitedCount()
return compute.VirtualMachineScaleSet{}, retry.GetRateLimitError(false, "VMSSGet")
return VirtualMachineScaleSet{}, retry.GetRateLimitError(false, "VMSSGet")
}

// Report errors if the client is throttled.
if c.RetryAfterReader.After(time.Now()) {
mc.ThrottledCount()
rerr := retry.GetThrottlingError("VMSSGet", "client throttled", c.RetryAfterReader)
return compute.VirtualMachineScaleSet{}, rerr
return VirtualMachineScaleSet{}, rerr
}

result, rerr := c.getVMSS(ctx, resourceGroupName, VMScaleSetName)
Expand All @@ -118,14 +118,14 @@ func (c *Client) Get(ctx context.Context, resourceGroupName string, VMScaleSetNa
}

// getVMSS gets a VirtualMachineScaleSet.
func (c *Client) getVMSS(ctx context.Context, resourceGroupName string, VMScaleSetName string) (compute.VirtualMachineScaleSet, *retry.Error) {
func (c *Client) getVMSS(ctx context.Context, resourceGroupName string, VMScaleSetName string) (VirtualMachineScaleSet, *retry.Error) {
resourceID := armclient.GetResourceID(
c.subscriptionID,
resourceGroupName,
vmssResourceType,
VMScaleSetName,
)
result := compute.VirtualMachineScaleSet{}
result := VirtualMachineScaleSet{}

response, rerr := c.armClient.GetResource(ctx, resourceID)
defer c.armClient.CloseResponse(ctx, response)
Expand All @@ -148,7 +148,7 @@ func (c *Client) getVMSS(ctx context.Context, resourceGroupName string, VMScaleS
}

// List gets a list of VirtualMachineScaleSets in the resource group.
func (c *Client) List(ctx context.Context, resourceGroupName string) ([]compute.VirtualMachineScaleSet, *retry.Error) {
func (c *Client) List(ctx context.Context, resourceGroupName string) ([]VirtualMachineScaleSet, *retry.Error) {
mc := metrics.NewMetricContext("vmss", "list", resourceGroupName, c.subscriptionID, "")

// Report errors if the client is rate limited.
Expand Down Expand Up @@ -179,13 +179,13 @@ func (c *Client) List(ctx context.Context, resourceGroupName string) ([]compute.
}

// listVMSS gets a list of VirtualMachineScaleSets in the resource group.
func (c *Client) listVMSS(ctx context.Context, resourceGroupName string) ([]compute.VirtualMachineScaleSet, *retry.Error) {
func (c *Client) listVMSS(ctx context.Context, resourceGroupName string) ([]VirtualMachineScaleSet, *retry.Error) {
resourceID := armclient.GetResourceListID(
c.subscriptionID,
resourceGroupName,
vmssResourceType,
)
result := make([]compute.VirtualMachineScaleSet, 0)
result := make([]VirtualMachineScaleSet, 0)
page := &VirtualMachineScaleSetListResultPage{}
page.fn = c.listNextResults

Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *Client) listVMSS(ctx context.Context, resourceGroupName string) ([]comp
}

// CreateOrUpdate creates or updates a VirtualMachineScaleSet.
func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error {
func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters VirtualMachineScaleSet, etag string) *retry.Error {
mc := metrics.NewMetricContext("vmss", "create_or_update", resourceGroupName, c.subscriptionID, "")

// Report errors if the client is rate limited.
Expand All @@ -237,7 +237,7 @@ func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, V
return rerr
}

rerr := c.createOrUpdateVMSS(ctx, resourceGroupName, VMScaleSetName, parameters)
rerr := c.createOrUpdateVMSS(ctx, resourceGroupName, VMScaleSetName, parameters, etag)
mc.Observe(rerr)
if rerr != nil {
if rerr.IsThrottled() {
Expand All @@ -252,7 +252,7 @@ func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, V
}

// CreateOrUpdateAsync sends the request to arm client and DO NOT wait for the response
func (c *Client) CreateOrUpdateAsync(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (*azure.Future, *retry.Error) {
func (c *Client) CreateOrUpdateAsync(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters VirtualMachineScaleSet, etag string) (*azure.Future, *retry.Error) {
mc := metrics.NewMetricContext("vmss", "create_or_update_async", resourceGroupName, c.subscriptionID, "")

// Report errors if the client is rate limited.
Expand All @@ -275,7 +275,12 @@ func (c *Client) CreateOrUpdateAsync(ctx context.Context, resourceGroupName stri
VMScaleSetName,
)

future, rerr := c.armClient.PutResourceAsync(ctx, resourceID, parameters)
decorators := []autorest.PrepareDecorator{}
if etag != "" {
decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(etag)))
}

future, rerr := c.armClient.PutResourceAsync(ctx, resourceID, parameters, decorators...)
mc.Observe(rerr)
if rerr != nil {
if rerr.IsThrottled() {
Expand Down Expand Up @@ -318,14 +323,20 @@ func (c *Client) WaitForAsyncOperationResult(ctx context.Context, future *azure.
}

// createOrUpdateVMSS creates or updates a VirtualMachineScaleSet.
func (c *Client) createOrUpdateVMSS(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error {
func (c *Client) createOrUpdateVMSS(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters VirtualMachineScaleSet, etag string) *retry.Error {
resourceID := armclient.GetResourceID(
c.subscriptionID,
resourceGroupName,
vmssResourceType,
VMScaleSetName,
)
response, rerr := c.armClient.PutResource(ctx, resourceID, parameters)

decorators := []autorest.PrepareDecorator{}
if etag != "" {
decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(etag)))
}

response, rerr := c.armClient.PutResource(ctx, resourceID, parameters, decorators...)
defer c.armClient.CloseResponse(ctx, response)
if rerr != nil {
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmss.put.request", resourceID, rerr.Error())
Expand Down Expand Up @@ -353,7 +364,7 @@ func (c *Client) createOrUpdateResponder(resp *http.Response) (*compute.VirtualM
return result, retry.GetError(resp, err)
}

func (c *Client) listResponder(resp *http.Response) (result compute.VirtualMachineScaleSetListResult, err error) {
func (c *Client) listResponder(resp *http.Response) (result VirtualMachineScaleSetListResult, err error) {
err = autorest.Respond(
resp,
autorest.ByIgnoring(),
Expand All @@ -365,7 +376,7 @@ func (c *Client) listResponder(resp *http.Response) (result compute.VirtualMachi

// virtualMachineScaleSetListResultPreparer prepares a request to retrieve the next set of results.
// It returns nil if no more results exist.
func (c *Client) virtualMachineScaleSetListResultPreparer(ctx context.Context, vmsslr compute.VirtualMachineScaleSetListResult) (*http.Request, error) {
func (c *Client) virtualMachineScaleSetListResultPreparer(ctx context.Context, vmsslr VirtualMachineScaleSetListResult) (*http.Request, error) {
if vmsslr.NextLink == nil || len(ptr.Deref(vmsslr.NextLink, "")) < 1 {
return nil, nil
}
Expand All @@ -377,7 +388,7 @@ func (c *Client) virtualMachineScaleSetListResultPreparer(ctx context.Context, v
}

// listNextResults retrieves the next set of results, if any.
func (c *Client) listNextResults(ctx context.Context, lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error) {
func (c *Client) listNextResults(ctx context.Context, lastResults VirtualMachineScaleSetListResult) (result VirtualMachineScaleSetListResult, err error) {
req, err := c.virtualMachineScaleSetListResultPreparer(ctx, lastResults)
if err != nil {
return result, autorest.NewErrorWithError(err, "vmssclient", "listNextResults", nil, "Failure preparing next results request")
Expand All @@ -403,8 +414,8 @@ func (c *Client) listNextResults(ctx context.Context, lastResults compute.Virtua

// VirtualMachineScaleSetListResultPage contains a page of VirtualMachineScaleSet values.
type VirtualMachineScaleSetListResultPage struct {
fn func(context.Context, compute.VirtualMachineScaleSetListResult) (compute.VirtualMachineScaleSetListResult, error)
vmsslr compute.VirtualMachineScaleSetListResult
fn func(context.Context, VirtualMachineScaleSetListResult) (VirtualMachineScaleSetListResult, error)
vmsslr VirtualMachineScaleSetListResult
}

// NextWithContext advances to the next page of values. If there was an error making
Expand All @@ -431,12 +442,12 @@ func (page VirtualMachineScaleSetListResultPage) NotDone() bool {
}

// Response returns the raw server response from the last page request.
func (page VirtualMachineScaleSetListResultPage) Response() compute.VirtualMachineScaleSetListResult {
func (page VirtualMachineScaleSetListResultPage) Response() VirtualMachineScaleSetListResult {
return page.vmsslr
}

// Values returns the slice of values for the current page or nil if there are no values.
func (page VirtualMachineScaleSetListResultPage) Values() []compute.VirtualMachineScaleSet {
func (page VirtualMachineScaleSetListResultPage) Values() []VirtualMachineScaleSet {
if page.vmsslr.IsEmpty() {
return nil
}
Expand Down
Loading