diff --git a/pkg/session/stream.go b/pkg/session/stream.go index 49bd8d2..f49938c 100644 --- a/pkg/session/stream.go +++ b/pkg/session/stream.go @@ -66,9 +66,15 @@ func stream(ctx context.Context, dir Direction, r, w net.Conn, h Handler, ic Int } default: if p, ok := pkt.(*packets.PublishPacket); ok { - if err = h.AuthPublish(ctx, &p.TopicName, &p.Payload); err != nil { + topics := []string{p.TopicName} + // The broker sends subscription messages to the client as Publish Packets. + // We need to check if the Publish packet sent by the broker is allowed to be received to by the client. + // Therefore, we are using handler.AuthSubscribe instead of handler.AuthPublish. + if err = h.AuthSubscribe(ctx, &topics); err != nil { pkt = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) - err = pkt.Write(w) + if wErr := pkt.Write(w); wErr != nil { + err = errors.Join(err, wErr) + } errs <- wrap(ctx, err, dir) return }