From 5f750d1bf21a498e1c7c0ed54907839162d8438c Mon Sep 17 00:00:00 2001 From: Integralist Date: Fri, 1 Nov 2024 11:18:19 +0000 Subject: [PATCH] refactor(kvstore): improve key deletion performance --- fastly/resource_fastly_kvstore.go | 172 +++++++++++++++++++++++++----- 1 file changed, 144 insertions(+), 28 deletions(-) diff --git a/fastly/resource_fastly_kvstore.go b/fastly/resource_fastly_kvstore.go index 5e1661c24..f3d178796 100644 --- a/fastly/resource_fastly_kvstore.go +++ b/fastly/resource_fastly_kvstore.go @@ -4,8 +4,9 @@ import ( "context" "fmt" "log" - "sort" + "sync" + "github.com/fastly/go-fastly/v9/fastly" gofastly "github.com/fastly/go-fastly/v9/fastly" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -44,6 +45,18 @@ func resourceFastlyKVStore() *schema.Resource { Description: "A unique name to identify the KV Store. It is important to note that changing this attribute will delete and recreate the KV Store, and discard the current entries. You MUST first delete the associated resource_link block from your service before modifying this field.", ForceNew: true, }, + "delete_keys_pool_size": { + Type: schema.TypeInt, + Optional: true, + Default: 100, + Description: "Used only with `force_destroy` to define the size of the thread-pool used when deleting keys concurrently.", + }, + "delete_keys_max_errors": { + Type: schema.TypeInt, + Optional: true, + Default: 100, + Description: "Used only with `force_destroy` to define the buffer length of a channel holding any errors while deleting keys concurrently.", + }, }, } } @@ -107,37 +120,43 @@ func resourceFastlyKVStoreUpdate(_ context.Context, _ *schema.ResourceData, _ an func resourceFastlyKVStoreDelete(_ context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { conn := meta.(*APIClient).conn - if !d.Get("force_destroy").(bool) { - mayDelete, err := isKVStoreEmpty(d.Id(), conn) - if err != nil { - return diag.FromErr(err) - } + storeEmpty, err := isKVStoreEmpty(d.Id(), conn) + if err != nil { + return diag.FromErr(err) + } - if !mayDelete { + if !storeEmpty { + if !d.Get("force_destroy").(bool) { return diag.FromErr(fmt.Errorf("cannot delete KV Store (%s), it is not empty. Either delete the entries first, or set force_destroy to true and apply it before making this change", d.Id())) } - } - - // IMPORTANT: We must delete all keys first before we can delete the store. - p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{ - StoreID: d.Id(), - }) - for p.Next() { - keys := p.Keys() - sort.Strings(keys) - for _, key := range keys { - err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{ - StoreID: d.Id(), - Key: key, - }) - if err != nil { - return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err)) - } + maxErrors := d.Get("delete_keys_max_errors").(int) + poolSize := d.Get("delete_keys_pool_size").(int) + err := deleteAllKVStoreKeys(conn, d.Id(), maxErrors, poolSize) + if err != nil { + return diag.FromErr(fmt.Errorf("failed to delete all KV Store keys: %w", err)) } } - if err := p.Err(); err != nil { - return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err)) - } + + // p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{ + // StoreID: d.Id(), + // }) + // for p.Next() { + // keys := p.Keys() + // sort.Strings(keys) + // for _, key := range keys { + // err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{ + // StoreID: d.Id(), + // Key: key, + // }) + // if err != nil { + // return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err)) + // } + // } + // } + // if err := p.Err(); err != nil { + // return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err)) + // } + // input := gofastly.DeleteKVStoreInput{ StoreID: d.Id(), @@ -145,13 +164,110 @@ func resourceFastlyKVStoreDelete(_ context.Context, d *schema.ResourceData, meta log.Printf("[DEBUG] DELETE: KV Store input: %#v", input) - err := conn.DeleteKVStore(&input) + err = conn.DeleteKVStore(&input) if err != nil { return diag.FromErr(err) } return nil } +// deleteAllKVStoreKeys deletes all keys within the specified KV Store. +func deleteAllKVStoreKeys(conn *gofastly.Client, storeID string, maxErrors, poolSize int) error { + p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{ + StoreID: storeID, + }) + + errorsCh := make(chan string, maxErrors) + keysCh := make(chan string, 1000) // number correlates to pagination page size + + var ( + failedKeys []string + mu sync.Mutex + wgProcessing sync.WaitGroup + wgErrorCh sync.WaitGroup + ) + + // We have three separate execution flows happening at once: + // + // 1. Pushing keys from pagination data into a key channel. + // 2. Pulling keys from error channel and appending to failedKeys slice. + // 3. Pulling keys from key channel and issuing API DELETE call. + // + // The second item is problematic, in that ranging over a channel only + // terminates when the channel is closed. So we need to ensure we close the + // errorsCh once we've finished processing the deletion of all the keys. + // + // To do that we need two sets of wait groups. + // + // The first is wgProcessing which keeps track of all goroutines related to + // processing the pagination data (e.g. the goroutine ranging over the + // paginator keys, and the goroutine ranging over the keysCh as part of the + // poolSize loop). + // + // The second wait group is wgErrorCh which tracks when the first + // (wgProcessing) has completed and then closes errorsCh. + + // The following goroutine finishes once all pagination keys have been + // processed. + wgProcessing.Add(1) + go func() { + defer wgProcessing.Done() + defer close(keysCh) + for p.Next() { + for _, key := range p.Keys() { + keysCh <- key + } + } + }() + + // The following goroutine finishes once the errorsCh is closed. + wgErrorCh.Add(1) + go func() { + defer wgErrorCh.Done() + for err := range errorsCh { + mu.Lock() + failedKeys = append(failedKeys, err) + mu.Unlock() + } + }() + + // The following goroutines close once they've pulled all data from keysCh. + for i := 1; i <= poolSize; i++ { + wgProcessing.Add(1) + go func() { + defer wgProcessing.Done() + for key := range keysCh { + err := conn.DeleteKVStoreKey(&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key}) + if err != nil { + select { + case errorsCh <- key: + default: + continue + } + } + } + }() + } + + // The following goroutine is closed once the 'processing' goroutines are + // finished. + wgErrorCh.Add(1) + go func() { + defer wgErrorCh.Done() + wgProcessing.Wait() // Wait for all deletion and pagination tasks. + close(errorsCh) + }() + + // Wait for the error-handling goroutines to finish processing. + wgErrorCh.Wait() + + if len(failedKeys) > 0 { + return fmt.Errorf("failed to delete %d keys", len(failedKeys)) + } + + return nil +} + func isKVStoreEmpty(storeID string, conn *gofastly.Client) (bool, error) { keys, err := conn.ListKVStoreKeys(&gofastly.ListKVStoreKeysInput{ StoreID: storeID,