diff --git a/net/stream/jetstream/stream.go b/net/stream/jetstream/stream.go index 4608029..37048fd 100644 --- a/net/stream/jetstream/stream.go +++ b/net/stream/jetstream/stream.go @@ -20,7 +20,9 @@ type ( name string info *nats.StreamInfo config *nats.StreamConfig + configJSOptions []nats.JSOpt namespace string + jsOptions []nats.JSOpt natsOptions []nats.Option reconnectMaxRetries int reconnectTimeout time.Duration @@ -59,22 +61,30 @@ func WithReconnectMaxRetries(v int) Option { } } -func WithConfig(v *nats.StreamConfig) Option { +func WithConfig(v *nats.StreamConfig, opts ...nats.JSOpt) Option { return func(o *Stream) { o.config = v + o.configJSOptions = append(o.configJSOptions, opts...) + } +} + +// WithJSOptions option +func WithJSOptions(v ...nats.JSOpt) Option { + return func(o *Stream) { + o.jsOptions = append(o.jsOptions, v...) } } // WithNatsOptions option func WithNatsOptions(v ...nats.Option) Option { return func(o *Stream) { - o.natsOptions = v + o.natsOptions = append(o.natsOptions, v...) } } func PublisherWithPubOpts(v ...nats.PubOpt) PublisherOption { return func(o *Publisher) { - o.pubOpts = v + o.pubOpts = append(o.pubOpts, v...) } } @@ -98,7 +108,7 @@ func SubscriberWithNamespace(v string) SubscriberOption { func SubscriberWithSubOpts(v ...nats.SubOpt) SubscriberOption { return func(o *Subscriber) { - o.opts = v + o.opts = append(o.opts, v...) } } @@ -118,9 +128,14 @@ func (s *Stream) connect() error { // create jet stream js, err := conn.JetStream( - nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) { - s.l.Error("nats async publish error", log.FError(err)) - }), + append( + []nats.JSOpt{ + nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) { + s.l.Error("nats async publish error", log.FError(err)) + }), + }, + s.jsOptions..., + )..., ) if err != nil { return err @@ -130,8 +145,8 @@ func (s *Stream) connect() error { // create / update stream if config exists if s.config != nil { s.config.Name = s.Name() - if _, err = js.StreamInfo(s.Name()); errors.Is(err, nats.ErrStreamNotFound) { - if info, err := js.AddStream(s.config); err != nil { + if _, err = js.StreamInfo(s.Name(), s.configJSOptions...); errors.Is(err, nats.ErrStreamNotFound) { + if info, err := js.AddStream(s.config, s.configJSOptions...); err != nil { return errors.Wrap(err, "failed to add stream") } else if err != nil { return errors.Wrap(err, "failed to retrieve stream info") @@ -140,7 +155,7 @@ func (s *Stream) connect() error { } } else if err != nil { return errors.Wrap(err, "failed get stream info") - } else if info, err := js.UpdateStream(s.config); err != nil { + } else if info, err := js.UpdateStream(s.config, s.configJSOptions...); err != nil { return errors.Wrap(err, "failed to update stream") } else { s.info = info