Skip to content

Commit

Permalink
Handle puback and response waiting in one place
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Feb 6, 2024
1 parent 39748d3 commit 56bcbff
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 37 deletions.
15 changes: 2 additions & 13 deletions messagix/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,11 @@ func (s *Socket) handleReadyEvent(data *Event_Ready) error {
return fmt.Errorf("failed to get app settings JSON: %w", err)
}

packetId, err := s.sendPublishPacket(LS_APP_SETTINGS, appSettingPublishJSON, &packets.PublishPacket{QOSLevel: packets.QOS_LEVEL_1}, s.SafePacketId())
_, err = s.sendPublishPacket(LS_APP_SETTINGS, appSettingPublishJSON, &packets.PublishPacket{QOSLevel: packets.QOS_LEVEL_1}, s.SafePacketId())
if err != nil {
return fmt.Errorf("failed to send app settings packet: %w", err)
}

appSettingAck := s.responseHandler.waitForPubACKDetails(packetId)
if appSettingAck == nil {
return fmt.Errorf("didn't get ack for app settings packet %d", packetId)
}

_, err = s.sendSubscribePacket(LS_FOREGROUND_STATE, packets.QOS_LEVEL_0, true)
if err != nil {
return fmt.Errorf("failed to subscribe to /ls_foreground_state: %w", err)
Expand Down Expand Up @@ -101,16 +96,11 @@ func (s *Socket) handleReadyEvent(data *Event_Ready) error {
}

s.client.Logger.Trace().Any("data", string(payload)).Msg("Sync groups tasks")
packetId, err = s.makeLSRequest(payload, 3)
_, err = s.makeLSRequest(payload, 3)
if err != nil {
return fmt.Errorf("failed to send sync tasks: %w", err)
}

resp := s.responseHandler.waitForPubResponseDetails(packetId)
if resp == nil {
return fmt.Errorf("didn't receive response to sync task %d", packetId)
}

_, err = s.client.ExecuteTasks(&socket.ReportAppStateTask{AppState: table.FOREGROUND, RequestId: uuid.NewString()})
if err != nil {
return fmt.Errorf("failed to report app state: %w", err)
Expand Down Expand Up @@ -139,7 +129,6 @@ func (s *Socket) handleACKEvent(ackData AckEvent) {
func (s *Socket) handlePublishResponseEvent(resp *Event_PublishResponse, qos packets.QoS) {
packetId := resp.Data.RequestID
hasPacket := s.responseHandler.hasPacket(uint16(packetId))
// s.client.Logger.Debug().Any("packetId", packetId).Any("resp", resp).Msg("got response!")
switch resp.Topic {
case string(LS_RESP):
resp.Finish()
Expand Down
6 changes: 3 additions & 3 deletions messagix/responsehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ResponseHandler struct {
}

func (p *ResponseHandler) hasPacket(packetId uint16) bool {
_, ok := p.packetChannels[packetId]
_, ok := p.requestChannels[packetId]
return ok
}

Expand Down Expand Up @@ -70,8 +70,8 @@ func (p *ResponseHandler) waitForDetails(packetId uint16, channelType ChannelTyp
return response
case <-time.After(packetTimeout):
p.deleteDetails(packetId, channelType)
// TODO this is completely wrong, it should be a proper error
return &Event_PublishResponse{}
// TODO this should probably be an error
return nil
}
}

Expand Down
27 changes: 20 additions & 7 deletions messagix/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,18 @@ func (s *Socket) sendPublishPacket(topic Topic, jsonData string, packet *packets
return packetId, err
}

s.responseHandler.addPacketChannel(packetId)
// TODO this should probably wait for the puback packet
return packetId, s.sendData(publishRequestPayload)
err = s.sendData(publishRequestPayload)
if err != nil {
s.responseHandler.deleteDetails(packetId, PacketChannel)
s.responseHandler.deleteDetails(packetId, RequestChannel)
return packetId, err
}
ack := s.responseHandler.waitForPubACKDetails(packetId)
if ack == nil {
s.responseHandler.deleteDetails(packetId, RequestChannel)
return packetId, fmt.Errorf("puback timeout")
}
return packetId, nil
}

type SocketLSRequestPayload struct {
Expand All @@ -346,7 +355,7 @@ type SocketLSRequestPayload struct {
Type int `json:"type"`
}

func (s *Socket) makeLSRequest(payload []byte, t int) (uint16, error) {
func (s *Socket) makeLSRequest(payload []byte, t int) (*Event_PublishResponse, error) {
packetId := s.SafePacketId()
lsPayload := &SocketLSRequestPayload{
AppId: s.client.configs.browserConfigTable.CurrentUserInitialData.AppID,
Expand All @@ -357,15 +366,19 @@ func (s *Socket) makeLSRequest(payload []byte, t int) (uint16, error) {

jsonPayload, err := json.Marshal(lsPayload)
if err != nil {
return 0, err
return nil, err
}

_, err = s.sendPublishPacket(LS_REQ, string(jsonPayload), &packets.PublishPacket{QOSLevel: packets.QOS_LEVEL_1}, packetId)
if err != nil {
return 0, err
return nil, err
}

return packetId, nil
resp := s.responseHandler.waitForPubResponseDetails(packetId)
if resp == nil {
return nil, fmt.Errorf("publish response timeout")
}
return resp, nil
}

func (s *Socket) getConnHeaders() http.Header {
Expand Down
6 changes: 1 addition & 5 deletions messagix/syncManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,11 @@ func (sm *SyncManager) SyncSocketData(databaseId int64, db *socket.QueryMetadata
return nil, fmt.Errorf("failed to marshal DatabaseQuery struct into json bytes (databaseId=%d): %v", databaseId, err)
}

packetId, err := sm.client.socket.makeLSRequest(jsonPayload, t)
resp, err := sm.client.socket.makeLSRequest(jsonPayload, t)
if err != nil {
return nil, fmt.Errorf("failed to make lightspeed socket request with DatabaseQuery byte payload (databaseId=%d): %v", databaseId, err)
}

resp := sm.client.socket.responseHandler.waitForPubResponseDetails(packetId)
if resp == nil {
return nil, fmt.Errorf("timed out while waiting for sync response from socket (databaseId=%d)", databaseId)
}
resp.Finish()

if len(resp.Table.LSHandleSyncFailure) > 0 {
Expand Down
12 changes: 3 additions & 9 deletions messagix/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@ func (c *Client) ExecuteTasks(tasks ...socket.Task) (*table.LSTable, error) {

payload, err := tskm.FinalizePayload()
if err != nil {
return nil, fmt.Errorf("failed to finalize payload for SendMessageTask: %v", err)
return nil, fmt.Errorf("failed to finalize payload: %v", err)
}

packetId, err := c.socket.makeLSRequest(payload, 3)
resp, err := c.socket.makeLSRequest(payload, 3)
if err != nil {
return nil, fmt.Errorf("failed to make LS request for SendMessageTask: %v", err)
return nil, fmt.Errorf("failed to send LS request: %v", err)
}

resp := c.socket.responseHandler.waitForPubResponseDetails(packetId)
if resp == nil {
return nil, fmt.Errorf("failed to receive response from socket after sending SendMessageTask. packetId: %d", packetId)
} else if resp.Topic == "" {
return nil, fmt.Errorf("request timed out")
}
resp.Finish()

return resp.Table, nil
Expand Down

0 comments on commit 56bcbff

Please sign in to comment.