Skip to content

Commit

Permalink
feat: add nats config js opts
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed Jun 1, 2023
1 parent 39df4c2 commit 6f5d8f9
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions net/stream/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ type (
name string
info *nats.StreamInfo
config *nats.StreamConfig
configJSOptions []nats.JSOpt
namespace string
jsOptions []nats.JSOpt
natsOptions []nats.Option
reconnectMaxRetries int
reconnectTimeout time.Duration
Expand Down Expand Up @@ -59,22 +61,30 @@ func WithReconnectMaxRetries(v int) Option {
}
}

func WithConfig(v *nats.StreamConfig) Option {
func WithConfig(v *nats.StreamConfig, opts ...nats.JSOpt) Option {
return func(o *Stream) {
o.config = v
o.configJSOptions = append(o.configJSOptions, opts...)
}
}

// WithJSOptions option
func WithJSOptions(v ...nats.JSOpt) Option {
return func(o *Stream) {
o.jsOptions = append(o.jsOptions, v...)
}
}

// WithNatsOptions option
func WithNatsOptions(v ...nats.Option) Option {
return func(o *Stream) {
o.natsOptions = v
o.natsOptions = append(o.natsOptions, v...)
}
}

func PublisherWithPubOpts(v ...nats.PubOpt) PublisherOption {
return func(o *Publisher) {
o.pubOpts = v
o.pubOpts = append(o.pubOpts, v...)
}
}

Expand All @@ -98,7 +108,7 @@ func SubscriberWithNamespace(v string) SubscriberOption {

func SubscriberWithSubOpts(v ...nats.SubOpt) SubscriberOption {
return func(o *Subscriber) {
o.opts = v
o.opts = append(o.opts, v...)
}
}

Expand All @@ -118,9 +128,14 @@ func (s *Stream) connect() error {

// create jet stream
js, err := conn.JetStream(
nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) {
s.l.Error("nats async publish error", log.FError(err))
}),
append(
[]nats.JSOpt{
nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) {
s.l.Error("nats async publish error", log.FError(err))
}),
},
s.jsOptions...,
)...,
)
if err != nil {
return err
Expand All @@ -130,8 +145,8 @@ func (s *Stream) connect() error {
// create / update stream if config exists
if s.config != nil {
s.config.Name = s.Name()
if _, err = js.StreamInfo(s.Name()); errors.Is(err, nats.ErrStreamNotFound) {
if info, err := js.AddStream(s.config); err != nil {
if _, err = js.StreamInfo(s.Name(), s.configJSOptions...); errors.Is(err, nats.ErrStreamNotFound) {
if info, err := js.AddStream(s.config, s.configJSOptions...); err != nil {
return errors.Wrap(err, "failed to add stream")
} else if err != nil {
return errors.Wrap(err, "failed to retrieve stream info")
Expand All @@ -140,7 +155,7 @@ func (s *Stream) connect() error {
}
} else if err != nil {
return errors.Wrap(err, "failed get stream info")
} else if info, err := js.UpdateStream(s.config); err != nil {
} else if info, err := js.UpdateStream(s.config, s.configJSOptions...); err != nil {
return errors.Wrap(err, "failed to update stream")
} else {
s.info = info
Expand Down

0 comments on commit 6f5d8f9

Please sign in to comment.