Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): Improve error handling for invalid domains #18

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GOLANGCI_VERSION = 1.53.3
GOLANGCI_VERSION = 1.59.1
LICENCES_IGNORE_LIST = $(shell cat licenses/licenses-ignore-list.txt)

VERSION ?= 0.0.1
Expand Down
198 changes: 78 additions & 120 deletions internal/stackitprovider/apply_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stackitprovider
import (
"context"
"fmt"
"sync"

stackitdnsclient "github.com/stackitcloud/stackit-sdk-go/services/dns"
"go.uber.org/zap"
Expand All @@ -12,43 +13,99 @@ import (

// ApplyChanges applies a given set of changes in a given zone.
func (d *StackitDNSProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
var tasks []changeTask
// create rr set. POST /v1/projects/{projectId}/zones/{zoneId}/rrsets
err := d.createRRSets(ctx, changes.Create)
if err != nil {
return err
}

tasks = append(tasks, d.buildRRSetTasks(changes.Create, CREATE)...)
// update rr set. PATCH /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
err = d.updateRRSets(ctx, changes.UpdateNew)
tasks = append(tasks, d.buildRRSetTasks(changes.UpdateNew, UPDATE)...)
d.logger.Info("records to delete", zap.String("records", fmt.Sprintf("%v", changes.Delete)))
// delete rr set. DELETE /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
tasks = append(tasks, d.buildRRSetTasks(changes.Delete, DELETE)...)

zones, err := d.zoneFetcherClient.zones(ctx)
if err != nil {
return err
}

// delete rr set. DELETE /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
err = d.deleteRRSets(ctx, changes.Delete)
if err != nil {
return err
return d.handleRRSetWithWorkers(ctx, tasks, zones)
}

// handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
func (d *StackitDNSProvider) buildRRSetTasks(
endpoints []*endpoint.Endpoint,
action string,
) []changeTask {
tasks := make([]changeTask, 0, len(endpoints))

for _, change := range endpoints {
tasks = append(tasks, changeTask{
action: action,
change: change,
})
}

return nil
return tasks
}

// createRRSets creates new record sets in the stackitprovider for the given endpoints that are in the
// creation field.
func (d *StackitDNSProvider) createRRSets(
// handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
func (d *StackitDNSProvider) handleRRSetWithWorkers(
ctx context.Context,
endpoints []*endpoint.Endpoint,
tasks []changeTask,
zones []stackitdnsclient.Zone,
) error {
if len(endpoints) == 0 {
return nil
workerChannel := make(chan changeTask, len(tasks))
errorChannel := make(chan error, len(tasks))

var wg sync.WaitGroup
for i := 0; i < d.workers; i++ {
wg.Add(1)
go d.changeWorker(ctx, workerChannel, errorChannel, zones, &wg)
}

zones, err := d.zoneFetcherClient.zones(ctx)
if err != nil {
return err
for _, task := range tasks {
workerChannel <- task
}
close(workerChannel)

// capture first error
var err error
for i := 0; i < len(tasks); i++ {
err = <-errorChannel
if err != nil {
break
}
}

// wait until all workers have finished
wg.Wait()

return err
}

// changeWorker is a worker that handles changes passed by a channel.
func (d *StackitDNSProvider) changeWorker(
ctx context.Context,
changes chan changeTask,
errorChannel chan error,
zones []stackitdnsclient.Zone,
wg *sync.WaitGroup,
) {
defer wg.Done()

for change := range changes {
var err error
switch change.action {
case CREATE:
err = d.createRRSet(ctx, change.change, zones)
case UPDATE:
err = d.updateRRSet(ctx, change.change, zones)
case DELETE:
err = d.deleteRRSet(ctx, change.change, zones)
}
errorChannel <- err
}

return d.handleRRSetWithWorkers(ctx, endpoints, zones, CREATE)
d.logger.Debug("change worker finished")
}

// createRRSet creates a new record set in the stackitprovider for the given endpoint.
Expand Down Expand Up @@ -88,24 +145,6 @@ func (d *StackitDNSProvider) createRRSet(
return nil
}

// updateRRSets patches (overrides) contents in the record sets in the stackitprovider for the given
// endpoints that are in the update new field.
func (d *StackitDNSProvider) updateRRSets(
ctx context.Context,
endpoints []*endpoint.Endpoint,
) error {
if len(endpoints) == 0 {
return nil
}

zones, err := d.zoneFetcherClient.zones(ctx)
if err != nil {
return err
}

return d.handleRRSetWithWorkers(ctx, endpoints, zones, UPDATE)
}

// updateRRSet patches (overrides) contents in the record set in the stackitprovider.
func (d *StackitDNSProvider) updateRRSet(
ctx context.Context,
Expand Down Expand Up @@ -142,28 +181,6 @@ func (d *StackitDNSProvider) updateRRSet(
return nil
}

// deleteRRSets deletes record sets in the stackitprovider for the given endpoints that are in the
// deletion field.
func (d *StackitDNSProvider) deleteRRSets(
ctx context.Context,
endpoints []*endpoint.Endpoint,
) error {
if len(endpoints) == 0 {
d.logger.Debug("no endpoints to delete")

return nil
}

d.logger.Info("records to delete", zap.String("records", fmt.Sprintf("%v", endpoints)))

zones, err := d.zoneFetcherClient.zones(ctx)
if err != nil {
return err
}

return d.handleRRSetWithWorkers(ctx, endpoints, zones, DELETE)
}

// deleteRRSet deletes a record set in the stackitprovider for the given endpoint.
func (d *StackitDNSProvider) deleteRRSet(
ctx context.Context,
Expand Down Expand Up @@ -197,62 +214,3 @@ func (d *StackitDNSProvider) deleteRRSet(

return nil
}

// handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
func (d *StackitDNSProvider) handleRRSetWithWorkers(
ctx context.Context,
endpoints []*endpoint.Endpoint,
zones []stackitdnsclient.Zone,
action string,
) error {
workerChannel := make(chan changeTask, len(endpoints))
errorChannel := make(chan error, len(endpoints))

for i := 0; i < d.workers; i++ {
go d.changeWorker(ctx, workerChannel, errorChannel, zones)
}

for _, change := range endpoints {
workerChannel <- changeTask{
action: action,
change: change,
}
}

for i := 0; i < len(endpoints); i++ {
err := <-errorChannel
if err != nil {
close(workerChannel)

return err
}
}

close(workerChannel)

return nil
}

// changeWorker is a worker that handles changes passed by a channel.
func (d *StackitDNSProvider) changeWorker(
ctx context.Context,
changes chan changeTask,
errorChannel chan error,
zones []stackitdnsclient.Zone,
) {
for change := range changes {
switch change.action {
case CREATE:
err := d.createRRSet(ctx, change.change, zones)
errorChannel <- err
case UPDATE:
err := d.updateRRSet(ctx, change.change, zones)
errorChannel <- err
case DELETE:
err := d.deleteRRSet(ctx, change.change, zones)
errorChannel <- err
}
}

d.logger.Debug("change worker finished")
}
56 changes: 56 additions & 0 deletions internal/stackitprovider/apply_changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stackitprovider
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -143,6 +144,61 @@ func TestNoRRSetFound(t *testing.T) {
assert.Error(t, err)
}

func TestPartialUpdate(t *testing.T) {
t.Parallel()

ctx := context.Background()
validZoneResponse := getValidResponseZoneAllBytes(t)

mux := http.NewServeMux()
server := httptest.NewServer(mux)
defer server.Close()

// Set up common endpoint for all types of changes
setUpCommonEndpoints(mux, validZoneResponse, http.StatusOK)
// Set up change type-specific endpoints
// based on setUpChangeTypeEndpoints(t, mux, validRRSetResponse, http.StatusOK, Update)
// but extended to check that the rrset is updated
rrSetUpdated := false
mux.HandleFunc(
"/v1/projects/1234/zones/1234/rrsets/1234",
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Println(r.Method)
if r.Method == http.MethodPatch {
rrSetUpdated = true
}
},
)
mux.HandleFunc(
"/v1/projects/1234/zones/1234/rrsets",
func(w http.ResponseWriter, r *http.Request) {
getRrsetsResponseRecordsNonPaged(t, w, "1234")
},
)
mux.HandleFunc(
"/v1/projects/1234/zones/5678/rrsets",
func(w http.ResponseWriter, r *http.Request) {
getRrsetsResponseRecordsNonPaged(t, w, "5678")
},
)

stackitDnsProvider, err := getDefaultTestProvider(server)
assert.NoError(t, err)

// Create update change
changes := getChangeTypeChanges(Update)
// Add task to create invalid endpoint
changes.Create = []*endpoint.Endpoint{
{DNSName: "notfound.com", Targets: endpoint.Targets{"test.notfound.com"}},
}

err = stackitDnsProvider.ApplyChanges(ctx, changes)
assert.Error(t, err)
assert.True(t, rrSetUpdated, "rrset was not updated")
}

// setUpCommonEndpoints for all change types.
func setUpCommonEndpoints(mux *http.ServeMux, responseZone []byte, responseZoneCode int) {
mux.HandleFunc("/v1/projects/1234/zones", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading