diff --git a/publish.go b/publish.go index 11c0480..98f30ad 100644 --- a/publish.go +++ b/publish.go @@ -3,6 +3,7 @@ package rabbitmq import ( "fmt" "sync" + "time" amqp "github.com/rabbitmq/amqp091-go" ) @@ -36,14 +37,35 @@ type PublishOptions struct { Mandatory bool // Immediate fails to publish if there are no consumers // that can ack bound to the queue on the routing key - Immediate bool + Immediate bool + // MIME content type ContentType string - // Transient or Persistent + // Transient (0 or 1) or Persistent (2) DeliveryMode uint8 // Expiration time in ms that a message will expire from a queue. // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers Expiration string - Headers Table + // MIME content encoding + ContentEncoding string + // 0 to 9 + Priority uint8 + // correlation identifier + CorrelationID string + // address to to reply to (ex: RPC) + ReplyTo string + // message identifier + MessageID string + // message timestamp + Timestamp time.Time + // message type name + Type string + // creating user id - ex: "guest" + UserID string + // creating application id + AppID string + // Application or exchange specific fields, + // the headers exchange will inspect this field. + Headers Table } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -95,6 +117,69 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { } } +// WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e. "utf-8" +func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.ContentEncoding = contentEncoding + } +} + +// WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9 +func WithPublishOptionsPriority(priority uint8) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Priority = priority + } +} + +// WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier +func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.CorrelationID = correlationID + } +} + +// WithPublishOptionsReplyTo returns a function that sets the reply to field +func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.ReplyTo = replyTo + } +} + +// WithPublishOptionsMessageID returns a function that sets the message identifier +func WithPublishOptionsMessageID(messageID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.MessageID = messageID + } +} + +// WithPublishOptionsTimestamp returns a function that sets the timestamp for the message +func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Timestamp = timestamp + } +} + +// WithPublishOptionsType returns a function that sets the message type name +func WithPublishOptionsType(messageType string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Type = messageType + } +} + +// WithPublishOptionsUserID returns a function that sets the user id i.e. "user" +func WithPublishOptionsUserID(userID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.UserID = userID + } +} + +// WithPublishOptionsAppID returns a function that sets the application id +func WithPublishOptionsAppID(appID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.AppID = appID + } +} + // Publisher allows you to publish messages safely across an open connection type Publisher struct { chManager *channelManager @@ -207,6 +292,15 @@ func (publisher *Publisher) Publish( message.Body = data message.Headers = tableToAMQPTable(options.Headers) message.Expiration = options.Expiration + message.ContentEncoding = options.ContentEncoding + message.Priority = options.Priority + message.CorrelationId = options.CorrelationID + message.ReplyTo = options.ReplyTo + message.MessageId = options.MessageID + message.Timestamp = options.Timestamp + message.Type = options.Type + message.UserId = options.UserID + message.AppId = options.AppID // Actual publish. err := publisher.chManager.channel.Publish(