Skip to content

Commit

Permalink
Merge pull request #182 from foomo/add-nats-js-opts
Browse files Browse the repository at this point in the history
Add nats js opts
  • Loading branch information
franklinkim authored Jun 1, 2023
2 parents c037d04 + 3d307d2 commit 7578ff2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 10 deletions.
7 changes: 7 additions & 0 deletions log/fields_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (

// HTTPTrackingIDKey represents the HTTP tracking id if known (e.g. from X-Tracking-ID).
HTTPTrackingIDKey = "http_tracking_id"

// HTTPRefererKey identifies the address of the web page (i.e., the URI or IRI), from which the resource has been requested.
HTTPRefererKey = "http_referer"
)

func FHTTPServerName(id string) zap.Field {
Expand Down Expand Up @@ -98,6 +101,10 @@ func FHTTPUserAgent(userAgent string) zap.Field {
return zap.String(HTTPUserAgentKey, userAgent)
}

func FHTTPReferer(host string) zap.Field {
return zap.String(HTTPRefererKey, host)
}

func FHTTPHost(host string) zap.Field {
return zap.String(HTTPHostKey, host)
}
Expand Down
11 changes: 11 additions & 0 deletions log/with.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ func WithHTTPRequestID(l *zap.Logger, r *http.Request) *zap.Logger {
}
}

func WithHTTPReferer(l *zap.Logger, r *http.Request) *zap.Logger {
if value := r.Header.Get("X-Referer"); value != "" {
return With(l, FHTTPReferer(value))
} else if value := r.Referer(); value != "" {
return With(l, FHTTPHost(value))
} else {
return l
}
}

func WithHTTPHost(l *zap.Logger, r *http.Request) *zap.Logger {
if value := r.Header.Get("X-Forwarded-Host"); value != "" {
return With(l, FHTTPHost(value))
Expand Down Expand Up @@ -120,6 +130,7 @@ func WithHTTPClientIP(l *zap.Logger, r *http.Request) *zap.Logger {

func WithHTTPRequest(l *zap.Logger, r *http.Request) *zap.Logger {
l = WithHTTPHost(l, r)
l = WithHTTPReferer(l, r)
l = WithHTTPRequestID(l, r)
l = WithHTTPSessionID(l, r)
l = WithHTTPTrackingID(l, r)
Expand Down
35 changes: 25 additions & 10 deletions net/stream/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ type (
name string
info *nats.StreamInfo
config *nats.StreamConfig
configJSOptions []nats.JSOpt
namespace string
jsOptions []nats.JSOpt
natsOptions []nats.Option
reconnectMaxRetries int
reconnectTimeout time.Duration
Expand Down Expand Up @@ -59,22 +61,30 @@ func WithReconnectMaxRetries(v int) Option {
}
}

func WithConfig(v *nats.StreamConfig) Option {
func WithConfig(v *nats.StreamConfig, opts ...nats.JSOpt) Option {
return func(o *Stream) {
o.config = v
o.configJSOptions = append(o.configJSOptions, opts...)
}
}

// WithJSOptions option
func WithJSOptions(v ...nats.JSOpt) Option {
return func(o *Stream) {
o.jsOptions = append(o.jsOptions, v...)
}
}

// WithNatsOptions option
func WithNatsOptions(v ...nats.Option) Option {
return func(o *Stream) {
o.natsOptions = v
o.natsOptions = append(o.natsOptions, v...)
}
}

func PublisherWithPubOpts(v ...nats.PubOpt) PublisherOption {
return func(o *Publisher) {
o.pubOpts = v
o.pubOpts = append(o.pubOpts, v...)
}
}

Expand All @@ -98,7 +108,7 @@ func SubscriberWithNamespace(v string) SubscriberOption {

func SubscriberWithSubOpts(v ...nats.SubOpt) SubscriberOption {
return func(o *Subscriber) {
o.opts = v
o.opts = append(o.opts, v...)
}
}

Expand All @@ -118,9 +128,14 @@ func (s *Stream) connect() error {

// create jet stream
js, err := conn.JetStream(
nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) {
s.l.Error("nats async publish error", log.FError(err))
}),
append(
[]nats.JSOpt{
nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) {
s.l.Error("nats async publish error", log.FError(err))
}),
},
s.jsOptions...,
)...,
)
if err != nil {
return err
Expand All @@ -130,8 +145,8 @@ func (s *Stream) connect() error {
// create / update stream if config exists
if s.config != nil {
s.config.Name = s.Name()
if _, err = js.StreamInfo(s.Name()); errors.Is(err, nats.ErrStreamNotFound) {
if info, err := js.AddStream(s.config); err != nil {
if _, err = js.StreamInfo(s.Name(), s.configJSOptions...); errors.Is(err, nats.ErrStreamNotFound) {
if info, err := js.AddStream(s.config, s.configJSOptions...); err != nil {
return errors.Wrap(err, "failed to add stream")
} else if err != nil {
return errors.Wrap(err, "failed to retrieve stream info")
Expand All @@ -140,7 +155,7 @@ func (s *Stream) connect() error {
}
} else if err != nil {
return errors.Wrap(err, "failed get stream info")
} else if info, err := js.UpdateStream(s.config); err != nil {
} else if info, err := js.UpdateStream(s.config, s.configJSOptions...); err != nil {
return errors.Wrap(err, "failed to update stream")
} else {
s.info = info
Expand Down

0 comments on commit 7578ff2

Please sign in to comment.