Skip to content

Commit

Permalink
Add todos for fixing response waiters
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Jan 28, 2024
1 parent e510d0d commit dd8d225
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 1 deletion.
1 change: 1 addition & 0 deletions messagix/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (pb *Event_PublishResponse) Finish() ResponseData {
Int("payload_length", len(pb.Data.Payload)).
Str("topic", pb.Topic).
Int64("request_id", pb.Data.RequestID).
Uint16("mqtt_message_id", pb.MessageIdentifier).
Strs("sp", pb.Data.Sp).
Int("target", pb.Data.Target)
if len(pb.Data.Payload) < 8192 {
Expand Down
1 change: 1 addition & 0 deletions messagix/responsehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (p *ResponseHandler) waitForDetails(packetId uint16, channelType ChannelTyp
return response
case <-time.After(packetTimeout):
p.deleteDetails(packetId, channelType)
// TODO this is completely wrong, it should be a proper error
return &Event_PublishResponse{}
}
}
Expand Down
3 changes: 2 additions & 1 deletion messagix/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *Socket) BuildBrokerUrl() string {
}

const pongTimeout = 30 * time.Second
const packetTimeout = 10 * time.Second
const packetTimeout = 30 * time.Second
const pingInterval = 10 * time.Second

func ptr[T any](val T) *T {
Expand Down Expand Up @@ -323,6 +323,7 @@ func (s *Socket) sendPublishPacket(topic Topic, jsonData string, packet *packets
}

s.responseHandler.addPacketChannel(packetId)
// TODO this should probably wait for the puback packet
return packetId, s.sendData(publishRequestPayload)
}

Expand Down
2 changes: 2 additions & 0 deletions messagix/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func (c *Client) ExecuteTasks(tasks ...socket.Task) (*table.LSTable, error) {
resp := c.socket.responseHandler.waitForPubResponseDetails(packetId)
if resp == nil {
return nil, fmt.Errorf("failed to receive response from socket after sending SendMessageTask. packetId: %d", packetId)
} else if resp.Topic == "" {
return nil, fmt.Errorf("request timed out")
}
resp.Finish()

Expand Down

0 comments on commit dd8d225

Please sign in to comment.