Skip to content

Commit

Permalink
refactor(kvstore): improve key deletion performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Integralist committed Nov 1, 2024
1 parent de01fe6 commit 5f750d1
Showing 1 changed file with 144 additions and 28 deletions.
172 changes: 144 additions & 28 deletions fastly/resource_fastly_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"fmt"
"log"
"sort"
"sync"

"github.com/fastly/go-fastly/v9/fastly"
gofastly "github.com/fastly/go-fastly/v9/fastly"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -44,6 +45,18 @@ func resourceFastlyKVStore() *schema.Resource {
Description: "A unique name to identify the KV Store. It is important to note that changing this attribute will delete and recreate the KV Store, and discard the current entries. You MUST first delete the associated resource_link block from your service before modifying this field.",
ForceNew: true,
},
"delete_keys_pool_size": {
Type: schema.TypeInt,
Optional: true,
Default: 100,
Description: "Used only with `force_destroy` to define the size of the thread-pool used when deleting keys concurrently.",
},
"delete_keys_max_errors": {
Type: schema.TypeInt,
Optional: true,
Default: 100,
Description: "Used only with `force_destroy` to define the buffer length of a channel holding any errors while deleting keys concurrently.",
},
},
}
}
Expand Down Expand Up @@ -107,51 +120,154 @@ func resourceFastlyKVStoreUpdate(_ context.Context, _ *schema.ResourceData, _ an
func resourceFastlyKVStoreDelete(_ context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
conn := meta.(*APIClient).conn

if !d.Get("force_destroy").(bool) {
mayDelete, err := isKVStoreEmpty(d.Id(), conn)
if err != nil {
return diag.FromErr(err)
}
storeEmpty, err := isKVStoreEmpty(d.Id(), conn)
if err != nil {
return diag.FromErr(err)
}

if !mayDelete {
if !storeEmpty {
if !d.Get("force_destroy").(bool) {
return diag.FromErr(fmt.Errorf("cannot delete KV Store (%s), it is not empty. Either delete the entries first, or set force_destroy to true and apply it before making this change", d.Id()))
}
}

// IMPORTANT: We must delete all keys first before we can delete the store.
p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{
StoreID: d.Id(),
})
for p.Next() {
keys := p.Keys()
sort.Strings(keys)
for _, key := range keys {
err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{
StoreID: d.Id(),
Key: key,
})
if err != nil {
return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err))
}
maxErrors := d.Get("delete_keys_max_errors").(int)
poolSize := d.Get("delete_keys_pool_size").(int)
err := deleteAllKVStoreKeys(conn, d.Id(), maxErrors, poolSize)
if err != nil {
return diag.FromErr(fmt.Errorf("failed to delete all KV Store keys: %w", err))
}
}
if err := p.Err(); err != nil {
return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err))
}

// p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{
// StoreID: d.Id(),
// })
// for p.Next() {
// keys := p.Keys()
// sort.Strings(keys)
// for _, key := range keys {
// err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{
// StoreID: d.Id(),
// Key: key,
// })
// if err != nil {
// return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err))
// }
// }
// }
// if err := p.Err(); err != nil {
// return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err))
// }
//

input := gofastly.DeleteKVStoreInput{
StoreID: d.Id(),
}

log.Printf("[DEBUG] DELETE: KV Store input: %#v", input)

err := conn.DeleteKVStore(&input)
err = conn.DeleteKVStore(&input)
if err != nil {
return diag.FromErr(err)
}
return nil
}

// deleteAllKVStoreKeys deletes all keys within the specified KV Store.
func deleteAllKVStoreKeys(conn *gofastly.Client, storeID string, maxErrors, poolSize int) error {
p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
StoreID: storeID,
})

errorsCh := make(chan string, maxErrors)
keysCh := make(chan string, 1000) // number correlates to pagination page size

var (
failedKeys []string
mu sync.Mutex
wgProcessing sync.WaitGroup
wgErrorCh sync.WaitGroup
)

// We have three separate execution flows happening at once:
//
// 1. Pushing keys from pagination data into a key channel.
// 2. Pulling keys from error channel and appending to failedKeys slice.
// 3. Pulling keys from key channel and issuing API DELETE call.
//
// The second item is problematic, in that ranging over a channel only
// terminates when the channel is closed. So we need to ensure we close the
// errorsCh once we've finished processing the deletion of all the keys.
//
// To do that we need two sets of wait groups.
//
// The first is wgProcessing which keeps track of all goroutines related to
// processing the pagination data (e.g. the goroutine ranging over the
// paginator keys, and the goroutine ranging over the keysCh as part of the
// poolSize loop).
//
// The second wait group is wgErrorCh which tracks when the first
// (wgProcessing) has completed and then closes errorsCh.

// The following goroutine finishes once all pagination keys have been
// processed.
wgProcessing.Add(1)
go func() {
defer wgProcessing.Done()
defer close(keysCh)
for p.Next() {
for _, key := range p.Keys() {
keysCh <- key
}
}
}()

// The following goroutine finishes once the errorsCh is closed.
wgErrorCh.Add(1)
go func() {
defer wgErrorCh.Done()
for err := range errorsCh {
mu.Lock()
failedKeys = append(failedKeys, err)
mu.Unlock()
}
}()

// The following goroutines close once they've pulled all data from keysCh.
for i := 1; i <= poolSize; i++ {
wgProcessing.Add(1)
go func() {
defer wgProcessing.Done()
for key := range keysCh {
err := conn.DeleteKVStoreKey(&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key})
if err != nil {
select {
case errorsCh <- key:
default:
continue
}
}
}
}()
}

// The following goroutine is closed once the 'processing' goroutines are
// finished.
wgErrorCh.Add(1)
go func() {
defer wgErrorCh.Done()
wgProcessing.Wait() // Wait for all deletion and pagination tasks.
close(errorsCh)
}()

// Wait for the error-handling goroutines to finish processing.
wgErrorCh.Wait()

if len(failedKeys) > 0 {
return fmt.Errorf("failed to delete %d keys", len(failedKeys))
}

return nil
}

func isKVStoreEmpty(storeID string, conn *gofastly.Client) (bool, error) {
keys, err := conn.ListKVStoreKeys(&gofastly.ListKVStoreKeysInput{
StoreID: storeID,
Expand Down

0 comments on commit 5f750d1

Please sign in to comment.