diff --git a/cmd/crc/cmd/status.go b/cmd/crc/cmd/status.go index 790c690faf..c78cb2562a 100644 --- a/cmd/crc/cmd/status.go +++ b/cmd/crc/cmd/status.go @@ -13,6 +13,7 @@ import ( "github.com/crc-org/crc/v2/pkg/crc/constants" "github.com/crc-org/crc/v2/pkg/crc/daemonclient" crcErrors "github.com/crc-org/crc/v2/pkg/crc/errors" + "github.com/crc-org/crc/v2/pkg/crc/machine/state" "github.com/crc-org/crc/v2/pkg/crc/machine/types" "github.com/crc-org/crc/v2/pkg/crc/preset" "github.com/docker/go-units" @@ -88,7 +89,7 @@ func runWatchStatus(writer io.Writer, client *daemonclient.Client, cacheDir stri } }() - err = client.SSEClient.Status(func(loadResult *types.ClusterLoadResult) { + err = client.SSEClient.ClusterLoad(func(loadResult *types.ClusterLoadResult) { if !isPoolInit { ramBar, cpuBars = createBars(loadResult.CPUUse, writer) barPool = pb.NewPool(append([]*pb.ProgressBar{ramBar}, cpuBars...)...) @@ -152,6 +153,9 @@ func getStatus(client *daemonclient.Client, cacheDir string) *status { } return &status{Success: false, Error: crcErrors.ToSerializableError(err)} } + if clusterStatus.CrcStatus == string(state.NoVM) { + return &status{Success: false, Error: crcErrors.ToSerializableError(crcErrors.VMNotExist)} + } var size int64 err = filepath.Walk(cacheDir, func(_ string, info os.FileInfo, err error) error { if !info.IsDir() { diff --git a/pkg/crc/api/client/sse_client.go b/pkg/crc/api/client/sse_client.go index 776742acc5..38ebffee87 100644 --- a/pkg/crc/api/client/sse_client.go +++ b/pkg/crc/api/client/sse_client.go @@ -21,8 +21,8 @@ func NewSSEClient(transport *http.Transport) *SSEClient { } } -func (c *SSEClient) Status(statusCallback func(*types.ClusterLoadResult)) error { - err := c.client.Subscribe("status", func(msg *sse.Event) { +func (c *SSEClient) ClusterLoad(statusCallback func(*types.ClusterLoadResult)) error { + err := c.client.Subscribe("cluster_load", func(msg *sse.Event) { wmState := &types.ClusterLoadResult{} err := json.Unmarshal(msg.Data, wmState) if err != nil { diff --git a/pkg/crc/api/events/status.go b/pkg/crc/api/events/cluster_load_stream.go similarity index 81% rename from pkg/crc/api/events/status.go rename to pkg/crc/api/events/cluster_load_stream.go index 1b14234816..4814e50a26 100644 --- a/pkg/crc/api/events/status.go +++ b/pkg/crc/api/events/cluster_load_stream.go @@ -19,18 +19,18 @@ type TickListener struct { tickPeriod time.Duration } -func newStatusStream(server *EventServer) EventStream { - return newStream(NewStatusListener(server.machine), newEventPublisher(STATUS, server.sseServer)) +func newClusterLoadStream(server *EventServer) EventStream { + return newStream(newClusterLoadListener(server.machine), newEventPublisher(ClusterLoad, server.sseServer)) } -func NewStatusListener(machine crcMachine.Client) EventProducer { +func newClusterLoadListener(machine crcMachine.Client) EventProducer { getStatus := func() (interface{}, error) { return machine.GetClusterLoad() } - return NewTickListener(getStatus) + return newTickListener(getStatus) } -func NewTickListener(generator genData) EventProducer { +func newTickListener(generator genData) EventProducer { return &TickListener{ done: make(chan bool), generator: generator, diff --git a/pkg/crc/api/events/event_server.go b/pkg/crc/api/events/event_server.go index fbdfc25fc8..1a7e5155c8 100644 --- a/pkg/crc/api/events/event_server.go +++ b/pkg/crc/api/events/event_server.go @@ -54,8 +54,9 @@ func NewEventServer(machine machine.Client) *EventServer { stream.RemoveSubscriber(sub) } - sseServer.CreateStream(LOGS) - sseServer.CreateStream(STATUS) + sseServer.CreateStream(Logs) + sseServer.CreateStream(ClusterLoad) + sseServer.CreateStream(StatusChange) return eventServer } @@ -65,10 +66,12 @@ func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func createEventStream(server *EventServer, streamID string) EventStream { switch streamID { - case LOGS: + case Logs: return newLogsStream(server) - case STATUS: - return newStatusStream(server) + case ClusterLoad: + return newClusterLoadStream(server) + case StatusChange: + return newStatusChangeStream(server) } return nil } diff --git a/pkg/crc/api/events/events.go b/pkg/crc/api/events/events.go index 012ad9dedb..027d35d7da 100644 --- a/pkg/crc/api/events/events.go +++ b/pkg/crc/api/events/events.go @@ -3,8 +3,9 @@ package events import "github.com/r3labs/sse/v2" const ( - LOGS = "logs" // Logs event channel, contains daemon logs - STATUS = "status" // status event channel, contains VM load info + Logs = "logs" // Logs event channel, contains daemon logs + ClusterLoad = "cluster_load" // status event channel, contains VM load info + StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc ) type EventPublisher interface { diff --git a/pkg/crc/api/events/log_stream.go b/pkg/crc/api/events/log_stream.go index 2601a38e87..329fda55cc 100644 --- a/pkg/crc/api/events/log_stream.go +++ b/pkg/crc/api/events/log_stream.go @@ -1,6 +1,8 @@ package events import ( + "bytes" + "github.com/crc-org/crc/v2/pkg/crc/logging" "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" @@ -56,7 +58,10 @@ func (s *streamHook) Fire(entry *logrus.Entry) error { return err } - s.server.Publish(LOGS, &sse.Event{Event: []byte(LOGS), Data: line}) + // remove "Line Feed"("\n") character which add was added by json.Encoder + line = bytes.TrimRight(line, "\n") + + s.server.Publish(Logs, &sse.Event{Event: []byte(Logs), Data: line}) return nil } diff --git a/pkg/crc/api/events/status_change_stream.go b/pkg/crc/api/events/status_change_stream.go new file mode 100644 index 0000000000..42d3daeb2f --- /dev/null +++ b/pkg/crc/api/events/status_change_stream.go @@ -0,0 +1,69 @@ +package events + +import ( + "encoding/json" + + "github.com/crc-org/crc/v2/pkg/crc/logging" + "github.com/crc-org/crc/v2/pkg/crc/machine" + "github.com/crc-org/crc/v2/pkg/crc/machine/state" + "github.com/crc-org/crc/v2/pkg/crc/machine/types" + "github.com/crc-org/crc/v2/pkg/events" + "github.com/r3labs/sse/v2" +) + +type serializableEvent struct { + Status *types.ClusterStatusResult `json:"status"` + Error string `json:"error,omitempty"` +} + +type statusChangeListener struct { + machineClient machine.Client + publisher EventPublisher +} + +func newStatusChangeStream(server *EventServer) EventStream { + return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer)) +} + +func newStatusChangeListener(client machine.Client) EventProducer { + return &statusChangeListener{ + machineClient: client, + } +} + +func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) { + logging.Debugf("State Changed Event %s", changedEvent) + var event serializableEvent + status, err := st.machineClient.Status() + // if we cannot receive actual state, send error state with error description + if err != nil { + event = serializableEvent{Status: &types.ClusterStatusResult{ + CrcStatus: state.Error, + }, Error: err.Error()} + } else { + // event could be fired, before actual code, which change state is called + // so status could contain 'old' state, replace it with state received in event + status.CrcStatus = changedEvent.State // override with actual reported state + event = serializableEvent{Status: status} + if changedEvent.Error != nil { + event.Error = changedEvent.Error.Error() + } + + } + data, err := json.Marshal(event) + if err != nil { + logging.Errorf("Could not serealize status changed event in to JSON: %s", err) + return + } + st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data}) +} + +func (st *statusChangeListener) Start(publisher EventPublisher) { + st.publisher = publisher + events.StatusChanged.AddListener(st) + +} + +func (st *statusChangeListener) Stop() { + events.StatusChanged.RemoveListener(st) +} diff --git a/pkg/crc/api/handlers.go b/pkg/crc/api/handlers.go index 0ac9fd4ece..42a5d188b4 100644 --- a/pkg/crc/api/handlers.go +++ b/pkg/crc/api/handlers.go @@ -50,14 +50,6 @@ func NewHandler(config *crcConfig.Config, machine machine.Client, logger Logger, } func (h *Handler) Status(c *context) error { - exists, err := h.Client.Exists() - if err != nil { - return err - } - if !exists { - return c.String(http.StatusInternalServerError, string(errors.VMNotExist)) - } - res, err := h.Client.Status() if err != nil { return err diff --git a/pkg/crc/machine/state/state.go b/pkg/crc/machine/state/state.go index 60bdfc2707..e3cddf5857 100644 --- a/pkg/crc/machine/state/state.go +++ b/pkg/crc/machine/state/state.go @@ -10,6 +10,7 @@ const ( Stopped State = "Stopped" Stopping State = "Stopping" Starting State = "Starting" + NoVM State = "NoVM" Error State = "Error" ) diff --git a/pkg/crc/machine/status.go b/pkg/crc/machine/status.go index 3f5bc9cb9a..239d59e302 100644 --- a/pkg/crc/machine/status.go +++ b/pkg/crc/machine/status.go @@ -18,8 +18,7 @@ func (client *client) Status() (*types.ClusterStatusResult, error) { if err != nil { if errors.Is(err, errMissingHost(client.name)) { return &types.ClusterStatusResult{ - CrcStatus: state.Stopped, - OpenshiftStatus: types.OpenshiftStopped, + CrcStatus: state.NoVM, }, nil } return nil, errors.Wrap(err, fmt.Sprintf("Cannot load '%s' virtual machine", client.name)) diff --git a/pkg/crc/machine/sync.go b/pkg/crc/machine/sync.go index 6da384b415..fe53086d1b 100644 --- a/pkg/crc/machine/sync.go +++ b/pkg/crc/machine/sync.go @@ -10,6 +10,7 @@ import ( "github.com/crc-org/crc/v2/pkg/crc/machine/state" "github.com/crc-org/crc/v2/pkg/crc/machine/types" crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset" + "github.com/crc-org/crc/v2/pkg/events" ) const startCancelTimeout = 15 * time.Second @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error { err := s.underlying.Delete() s.syncOperationDone <- Deleting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM}) + } return err } @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error { } s.startCancel = startCancel s.currentState = Starting + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting}) return nil } @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig) startResult, err := s.underlying.Start(ctx, startConfig) s.syncOperationDone <- Starting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + return startResult, err } @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) { if err := s.prepareStopDelete(Stopping); err != nil { return state.Error, err } + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping}) st, err := s.underlying.Stop() s.syncOperationDone <- Stopping + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: st}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } return st, err } @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) { } func (s *Synchronized) PowerOff() error { - return s.underlying.PowerOff() + err := s.underlying.PowerOff() + if err != nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + + return err } func (s *Synchronized) Status() (*types.ClusterStatusResult, error) { diff --git a/pkg/crc/machine/virtualmachine.go b/pkg/crc/machine/virtualmachine.go index f8af367c55..844e417911 100644 --- a/pkg/crc/machine/virtualmachine.go +++ b/pkg/crc/machine/virtualmachine.go @@ -32,6 +32,14 @@ func (err *MissingHostError) Error() string { return fmt.Sprintf("no such libmachine vm: %s", err.name) } +func (err *MissingHostError) Is(target error) bool { + var x *MissingHostError + if errors.As(target, &x) && x.name == err.name { + return true + } + return false +} + var errInvalidBundleMetadata = errors.New("Error loading bundle metadata") func loadVirtualMachine(name string, useVSock bool) (*virtualMachine, error) { diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go new file mode 100644 index 0000000000..fa1cfbf9d8 --- /dev/null +++ b/pkg/events/emitter.go @@ -0,0 +1,48 @@ +package events + +import ( + "sync" +) + +type Event[T any] interface { + AddListener(listener Notifiable[T]) + RemoveListener(listener Notifiable[T]) + Fire(data T) +} + +type Notifiable[T any] interface { + Notify(event T) +} + +type event[T any] struct { + listeners map[Notifiable[T]]Notifiable[T] + eventMutex sync.Mutex +} + +func NewEvent[T any]() Event[T] { + return &event[T]{ + listeners: make(map[Notifiable[T]]Notifiable[T]), + } +} + +func (e *event[T]) AddListener(listener Notifiable[T]) { + e.eventMutex.Lock() + defer e.eventMutex.Unlock() + e.listeners[listener] = listener +} + +func (e *event[T]) RemoveListener(listener Notifiable[T]) { + e.eventMutex.Lock() + defer e.eventMutex.Unlock() + delete(e.listeners, listener) +} + +func (e *event[T]) Fire(event T) { + e.eventMutex.Lock() + defer e.eventMutex.Unlock() + for _, listener := range e.listeners { + // shadowing for loop variable, need to remove after golang 1.22 migration + listener := listener + go listener.Notify(event) + } +} diff --git a/pkg/events/emitter_test.go b/pkg/events/emitter_test.go new file mode 100644 index 0000000000..c63a03c17d --- /dev/null +++ b/pkg/events/emitter_test.go @@ -0,0 +1,41 @@ +package events + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +type TestEventData struct { + Foo string +} + +type eventListener struct { + notifyCb func(e TestEventData) +} + +func newEventListener(notifyCb func(e TestEventData)) *eventListener { + return &eventListener{notifyCb: notifyCb} +} + +func (listener *eventListener) Notify(e TestEventData) { + listener.notifyCb(e) +} + +func TestEmitter(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + var data string + + event := NewEvent[TestEventData]() + event.AddListener(newEventListener(func(e TestEventData) { + data = e.Foo + wg.Done() + })) + event.Fire(TestEventData{Foo: "bar"}) + + wg.Wait() + + assert.Equal(t, "bar", data) +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000000..42a9865c7b --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,14 @@ +package events + +import ( + "github.com/crc-org/crc/v2/pkg/crc/machine/state" +) + +type StatusChangedEvent struct { + State state.State + Error error +} + +var ( + StatusChanged = NewEvent[StatusChangedEvent]() +)