-
Notifications
You must be signed in to change notification settings - Fork 256
Add status change event #3903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add status change event #3903
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| package events | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
cfergeau marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| "github.com/crc-org/crc/v2/pkg/crc/logging" | ||
| "github.com/crc-org/crc/v2/pkg/crc/machine" | ||
| "github.com/crc-org/crc/v2/pkg/crc/machine/state" | ||
| "github.com/crc-org/crc/v2/pkg/crc/machine/types" | ||
| "github.com/crc-org/crc/v2/pkg/events" | ||
| "github.com/r3labs/sse/v2" | ||
| ) | ||
|
|
||
| type serializableEvent struct { | ||
| Status *types.ClusterStatusResult `json:"status"` | ||
| Error string `json:"error,omitempty"` | ||
| } | ||
|
|
||
| type statusChangeListener struct { | ||
| machineClient machine.Client | ||
| publisher EventPublisher | ||
| } | ||
|
|
||
| func newStatusChangeStream(server *EventServer) EventStream { | ||
| return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer)) | ||
| } | ||
|
|
||
| func newStatusChangeListener(client machine.Client) EventProducer { | ||
| return &statusChangeListener{ | ||
| machineClient: client, | ||
| } | ||
| } | ||
|
|
||
| func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) { | ||
| logging.Debugf("State Changed Event %s", changedEvent) | ||
| var event serializableEvent | ||
| status, err := st.machineClient.Status() | ||
| // if we cannot receive actual state, send error state with error description | ||
| if err != nil { | ||
| event = serializableEvent{Status: &types.ClusterStatusResult{ | ||
| CrcStatus: state.Error, | ||
| }, Error: err.Error()} | ||
| } else { | ||
| // event could be fired, before actual code, which change state is called | ||
| // so status could contain 'old' state, replace it with state received in event | ||
| status.CrcStatus = changedEvent.State // override with actual reported state | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the part I find questionable in this PR. Instead of having a single source of truth for the status, we now have 2, some of the status comes from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spending more time on it, I understand more the problems this is trying to solve. Without The comment however seems to indicate sometimes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, there's more details in an older discussion:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, this is an indication that these events are not fired in the right place, and this is papered over with even more complexity :-/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The API matches the usecase. Stop should in my opinion not return an error when already stopped; as you succeed in doing what is requested. What more information do we actually give with saying this failed as it was already stopped?! If you believe this should be fixed or recorded, make these states clear for the convergence. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The part you quoted is a description of the behaviour in this PR. These are some notes I took during review/testing when I was trying to understand the design choices which led to the code being reviewed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I try to replace this code, with firing event at more "proper" place, but face problem that Or change state handling first, and then rebase this PR on top of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can go forward with the current code, but we really need more details in the commit log, so that it records the design choices you made when writing the code, so that we know the current limitations, why this overwrites the state, ... |
||
| event = serializableEvent{Status: status} | ||
| if changedEvent.Error != nil { | ||
| event.Error = changedEvent.Error.Error() | ||
| } | ||
|
|
||
| } | ||
| data, err := json.Marshal(event) | ||
| if err != nil { | ||
| logging.Errorf("Could not serealize status changed event in to JSON: %s", err) | ||
| return | ||
| } | ||
| st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data}) | ||
| } | ||
|
|
||
| func (st *statusChangeListener) Start(publisher EventPublisher) { | ||
| st.publisher = publisher | ||
| events.StatusChanged.AddListener(st) | ||
|
|
||
| } | ||
|
|
||
| func (st *statusChangeListener) Stop() { | ||
| events.StatusChanged.RemoveListener(st) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ import ( | |
| "github.com/crc-org/crc/v2/pkg/crc/machine/state" | ||
| "github.com/crc-org/crc/v2/pkg/crc/machine/types" | ||
| crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset" | ||
| "github.com/crc-org/crc/v2/pkg/events" | ||
| ) | ||
|
|
||
| const startCancelTimeout = 15 * time.Second | ||
|
|
@@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error { | |
|
|
||
| err := s.underlying.Delete() | ||
| s.syncOperationDone <- Deleting | ||
|
|
||
| if err == nil { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM}) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
|
|
@@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error { | |
| } | ||
| s.startCancel = startCancel | ||
| s.currentState = Starting | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting}) | ||
|
|
||
| return nil | ||
| } | ||
|
|
@@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig) | |
|
|
||
| startResult, err := s.underlying.Start(ctx, startConfig) | ||
| s.syncOperationDone <- Starting | ||
|
|
||
| if err == nil { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status}) | ||
| } else { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (commenting here, but that applies to most of the file). Can we emit a Regarding errors, it seems if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not sure to understand how to resolve you concern, the problem is that we don't have a way to track state change(at least I wasn’t able to find it, it would be nice if you point me that place if if it exist)
I first do that, but I face a race condition with updating state, for example: Possible solution to that, may be moving state change event firing from
It is matched, as we send with SSE status reported by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My main concern is #3903 (comment) , and I'm under the impression that if you have a terminal showing the stream of events, and another running There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I understand the current code base is missing a centralized "setState" facility, and that this PR has to find a way of doing this, just trying to understand the limitations in the PR) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, output should be exactly the same, to get that I add Otherwise we need to move event triggering deeper in crc code, and fire event after execution of pice of code which change And it is a question for all, should we integrate events in more depths of CRC core(which leads to spreading events over our codebase) or keep as is( in relative hi level Client interface implementation )? @gbraad @cfergeau @praveenkumar WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'd say we can go with this for now, more details about the design choices, the implementation limitations, ... One thing we can explore over time some centralized tracking of the cluster state, a |
||
|
|
||
| return startResult, err | ||
| } | ||
|
|
||
|
|
@@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) { | |
| if err := s.prepareStopDelete(Stopping); err != nil { | ||
| return state.Error, err | ||
| } | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping}) | ||
|
|
||
| st, err := s.underlying.Stop() | ||
| s.syncOperationDone <- Stopping | ||
|
|
||
| if err == nil { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: st}) | ||
| } else { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) | ||
| } | ||
| return st, err | ||
| } | ||
|
|
||
|
|
@@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) { | |
| } | ||
|
|
||
| func (s *Synchronized) PowerOff() error { | ||
| return s.underlying.PowerOff() | ||
| err := s.underlying.PowerOff() | ||
| if err != nil { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped}) | ||
| } else { | ||
| events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) | ||
| } | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| func (s *Synchronized) Status() (*types.ClusterStatusResult, error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package events | ||
|
|
||
| import ( | ||
| "sync" | ||
| ) | ||
|
|
||
| type Event[T any] interface { | ||
cfergeau marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| AddListener(listener Notifiable[T]) | ||
| RemoveListener(listener Notifiable[T]) | ||
| Fire(data T) | ||
| } | ||
|
|
||
| type Notifiable[T any] interface { | ||
| Notify(event T) | ||
| } | ||
|
|
||
| type event[T any] struct { | ||
| listeners map[Notifiable[T]]Notifiable[T] | ||
| eventMutex sync.Mutex | ||
| } | ||
|
|
||
| func NewEvent[T any]() Event[T] { | ||
| return &event[T]{ | ||
| listeners: make(map[Notifiable[T]]Notifiable[T]), | ||
| } | ||
| } | ||
|
|
||
| func (e *event[T]) AddListener(listener Notifiable[T]) { | ||
| e.eventMutex.Lock() | ||
evidolob marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| defer e.eventMutex.Unlock() | ||
| e.listeners[listener] = listener | ||
| } | ||
|
|
||
| func (e *event[T]) RemoveListener(listener Notifiable[T]) { | ||
| e.eventMutex.Lock() | ||
| defer e.eventMutex.Unlock() | ||
| delete(e.listeners, listener) | ||
| } | ||
|
|
||
| func (e *event[T]) Fire(event T) { | ||
| e.eventMutex.Lock() | ||
| defer e.eventMutex.Unlock() | ||
evidolob marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for _, listener := range e.listeners { | ||
cfergeau marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // shadowing for loop variable, need to remove after golang 1.22 migration | ||
| listener := listener | ||
| go listener.Notify(event) | ||
evidolob marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.