-
Notifications
You must be signed in to change notification settings - Fork 72
Add integration test for Resync request subscriber #488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,85 @@ | ||||||||||||
| package tests | ||||||||||||
|
|
||||||||||||
| import ( | ||||||||||||
| "encoding/json" | ||||||||||||
| "testing" | ||||||||||||
| "time" | ||||||||||||
|
|
||||||||||||
| "github.com/meshery/meshkit/broker" | ||||||||||||
| "github.com/nats-io/nats.go" | ||||||||||||
|
|
||||||||||||
| "github.com/meshery/meshsync/internal/config" | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| func TestIntegrationResyncFlowViaBroker(t *testing.T) { | ||||||||||||
| brokerURL := "nats://localhost:4222" | ||||||||||||
|
|
||||||||||||
| // connect to broker | ||||||||||||
| nc, err := nats.Connect(brokerURL) | ||||||||||||
| if err != nil { | ||||||||||||
| t.Fatalf("failed to connect to broker: %v", err) | ||||||||||||
| } | ||||||||||||
| defer nc.Drain() | ||||||||||||
|
|
||||||||||||
| // load config | ||||||||||||
| cfg, err := config.New("viper") | ||||||||||||
| if err != nil { | ||||||||||||
| t.Fatalf("failed to load config: %v", err) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // listener config (loaded even if unused) | ||||||||||||
| _ = cfg | ||||||||||||
|
|
||||||||||||
| // subjects | ||||||||||||
| requestSubject := "meshery.meshsync.request" | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The request subject is hardcoded. To improve maintainability and avoid magic strings, it's better to use the value defined in the configuration. You can get it from
Suggested change
|
||||||||||||
| responseSubject := config.DefaultPublishingSubject | ||||||||||||
|
|
||||||||||||
| // subscribe to resource events | ||||||||||||
| msgs := make(chan *nats.Msg, 200) | ||||||||||||
| sub, err := nc.ChanSubscribe(responseSubject, msgs) | ||||||||||||
| if err != nil { | ||||||||||||
| t.Fatalf("failed to subscribe to resource subject: %v", err) | ||||||||||||
| } | ||||||||||||
| defer sub.Unsubscribe() | ||||||||||||
|
|
||||||||||||
| // STEP 1 — wait for first discovery event (MeshSync must warm up) | ||||||||||||
| warmupTimeout := time.After(120 * time.Second) | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||
| receivedInitial := false | ||||||||||||
| for !receivedInitial { | ||||||||||||
| select { | ||||||||||||
| case msg := <-msgs: | ||||||||||||
| var m broker.Message | ||||||||||||
| if json.Unmarshal(msg.Data, &m) == nil && m.Object != nil { | ||||||||||||
| receivedInitial = true | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+50
to
+54
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for unmarshaling and validating a NATS message is duplicated here and again at lines 75-80. To improve code clarity and reduce duplication, consider extracting this logic into a helper function. For example: func isResourceEvent(msg *nats.Msg) bool {
var m broker.Message
return json.Unmarshal(msg.Data, &m) == nil && m.Object != nil
}You could then call this function in your |
||||||||||||
| case <-warmupTimeout: | ||||||||||||
| t.Fatalf("timeout: initial discovery events never arrived — MeshSync never warmed up") | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // STEP 2 — send resync AFTER MeshSync is initialized | ||||||||||||
| req := &broker.Message{ | ||||||||||||
| Request: &broker.RequestObject{ | ||||||||||||
| Entity: broker.ReSyncDiscoveryEntity, | ||||||||||||
| }, | ||||||||||||
| } | ||||||||||||
| payload, _ := json.Marshal(req) | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error returned by
Suggested change
|
||||||||||||
| if err := nc.Publish(requestSubject, payload); err != nil { | ||||||||||||
| t.Fatalf("failed to publish resync request: %v", err) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // STEP 3 — expect NEW resource events after resync | ||||||||||||
| afterTimeout := time.After(120 * time.Second) | ||||||||||||
| for { | ||||||||||||
| select { | ||||||||||||
| case msg := <-msgs: | ||||||||||||
| var m broker.Message | ||||||||||||
| if json.Unmarshal(msg.Data, &m) == nil && m.Object != nil { | ||||||||||||
| // resync confirmed | ||||||||||||
| return | ||||||||||||
| } | ||||||||||||
| case <-afterTimeout: | ||||||||||||
| t.Fatalf("timeout: no resource events were received after resync request") | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with other integration tests in this package, consider using the
testMeshsyncNatsURLvariable instead of a hardcoded string for the broker URL. This variable is defined inintegration_test.goand is accessible within the same package.