From dd8d225aa979fda8e901f144d3b18819a2ea0c55 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 28 Jan 2024 22:31:55 +0200 Subject: [PATCH] Add todos for fixing response waiters --- messagix/events.go | 1 + messagix/responsehandler.go | 1 + messagix/socket.go | 3 ++- messagix/threads.go | 2 ++ 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/messagix/events.go b/messagix/events.go index 613a575..c9a03fe 100644 --- a/messagix/events.go +++ b/messagix/events.go @@ -279,6 +279,7 @@ func (pb *Event_PublishResponse) Finish() ResponseData { Int("payload_length", len(pb.Data.Payload)). Str("topic", pb.Topic). Int64("request_id", pb.Data.RequestID). + Uint16("mqtt_message_id", pb.MessageIdentifier). Strs("sp", pb.Data.Sp). Int("target", pb.Data.Target) if len(pb.Data.Payload) < 8192 { diff --git a/messagix/responsehandler.go b/messagix/responsehandler.go index 1054efd..eb52e55 100644 --- a/messagix/responsehandler.go +++ b/messagix/responsehandler.go @@ -70,6 +70,7 @@ func (p *ResponseHandler) waitForDetails(packetId uint16, channelType ChannelTyp return response case <-time.After(packetTimeout): p.deleteDetails(packetId, channelType) + // TODO this is completely wrong, it should be a proper error return &Event_PublishResponse{} } } diff --git a/messagix/socket.go b/messagix/socket.go index e64ca3b..2bfb2a2 100644 --- a/messagix/socket.go +++ b/messagix/socket.go @@ -125,7 +125,7 @@ func (s *Socket) BuildBrokerUrl() string { } const pongTimeout = 30 * time.Second -const packetTimeout = 10 * time.Second +const packetTimeout = 30 * time.Second const pingInterval = 10 * time.Second func ptr[T any](val T) *T { @@ -323,6 +323,7 @@ func (s *Socket) sendPublishPacket(topic Topic, jsonData string, packet *packets } s.responseHandler.addPacketChannel(packetId) + // TODO this should probably wait for the puback packet return packetId, s.sendData(publishRequestPayload) } diff --git a/messagix/threads.go b/messagix/threads.go index 463197b..38f4819 100644 --- a/messagix/threads.go +++ b/messagix/threads.go @@ -27,6 +27,8 @@ func (c *Client) ExecuteTasks(tasks ...socket.Task) (*table.LSTable, error) { resp := c.socket.responseHandler.waitForPubResponseDetails(packetId) if resp == nil { return nil, fmt.Errorf("failed to receive response from socket after sending SendMessageTask. packetId: %d", packetId) + } else if resp.Topic == "" { + return nil, fmt.Errorf("request timed out") } resp.Finish()