From 6f5d8f9253720a9743bb5f15d955c123529f8754 Mon Sep 17 00:00:00 2001 From: franklin Date: Thu, 1 Jun 2023 11:43:51 +0200 Subject: [PATCH 1/2] feat: add nats config js opts --- net/stream/jetstream/stream.go | 35 ++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/net/stream/jetstream/stream.go b/net/stream/jetstream/stream.go index 4608029..37048fd 100644 --- a/net/stream/jetstream/stream.go +++ b/net/stream/jetstream/stream.go @@ -20,7 +20,9 @@ type ( name string info *nats.StreamInfo config *nats.StreamConfig + configJSOptions []nats.JSOpt namespace string + jsOptions []nats.JSOpt natsOptions []nats.Option reconnectMaxRetries int reconnectTimeout time.Duration @@ -59,22 +61,30 @@ func WithReconnectMaxRetries(v int) Option { } } -func WithConfig(v *nats.StreamConfig) Option { +func WithConfig(v *nats.StreamConfig, opts ...nats.JSOpt) Option { return func(o *Stream) { o.config = v + o.configJSOptions = append(o.configJSOptions, opts...) + } +} + +// WithJSOptions option +func WithJSOptions(v ...nats.JSOpt) Option { + return func(o *Stream) { + o.jsOptions = append(o.jsOptions, v...) } } // WithNatsOptions option func WithNatsOptions(v ...nats.Option) Option { return func(o *Stream) { - o.natsOptions = v + o.natsOptions = append(o.natsOptions, v...) } } func PublisherWithPubOpts(v ...nats.PubOpt) PublisherOption { return func(o *Publisher) { - o.pubOpts = v + o.pubOpts = append(o.pubOpts, v...) } } @@ -98,7 +108,7 @@ func SubscriberWithNamespace(v string) SubscriberOption { func SubscriberWithSubOpts(v ...nats.SubOpt) SubscriberOption { return func(o *Subscriber) { - o.opts = v + o.opts = append(o.opts, v...) } } @@ -118,9 +128,14 @@ func (s *Stream) connect() error { // create jet stream js, err := conn.JetStream( - nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) { - s.l.Error("nats async publish error", log.FError(err)) - }), + append( + []nats.JSOpt{ + nats.PublishAsyncErrHandler(func(js nats.JetStream, msg *nats.Msg, err error) { + s.l.Error("nats async publish error", log.FError(err)) + }), + }, + s.jsOptions..., + )..., ) if err != nil { return err @@ -130,8 +145,8 @@ func (s *Stream) connect() error { // create / update stream if config exists if s.config != nil { s.config.Name = s.Name() - if _, err = js.StreamInfo(s.Name()); errors.Is(err, nats.ErrStreamNotFound) { - if info, err := js.AddStream(s.config); err != nil { + if _, err = js.StreamInfo(s.Name(), s.configJSOptions...); errors.Is(err, nats.ErrStreamNotFound) { + if info, err := js.AddStream(s.config, s.configJSOptions...); err != nil { return errors.Wrap(err, "failed to add stream") } else if err != nil { return errors.Wrap(err, "failed to retrieve stream info") @@ -140,7 +155,7 @@ func (s *Stream) connect() error { } } else if err != nil { return errors.Wrap(err, "failed get stream info") - } else if info, err := js.UpdateStream(s.config); err != nil { + } else if info, err := js.UpdateStream(s.config, s.configJSOptions...); err != nil { return errors.Wrap(err, "failed to update stream") } else { s.info = info From 8366985ca3fca48a9eeb7da44052ddaea64d5cf1 Mon Sep 17 00:00:00 2001 From: franklin Date: Thu, 1 Jun 2023 11:44:14 +0200 Subject: [PATCH 2/2] feat: add referer --- log/fields_http.go | 7 +++++++ log/with.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/log/fields_http.go b/log/fields_http.go index 04e7923..d49704b 100644 --- a/log/fields_http.go +++ b/log/fields_http.go @@ -48,6 +48,9 @@ const ( // HTTPTrackingIDKey represents the HTTP tracking id if known (e.g. from X-Tracking-ID). HTTPTrackingIDKey = "http_tracking_id" + + // HTTPRefererKey identifies the address of the web page (i.e., the URI or IRI), from which the resource has been requested. + HTTPRefererKey = "http_referer" ) func FHTTPServerName(id string) zap.Field { @@ -98,6 +101,10 @@ func FHTTPUserAgent(userAgent string) zap.Field { return zap.String(HTTPUserAgentKey, userAgent) } +func FHTTPReferer(host string) zap.Field { + return zap.String(HTTPRefererKey, host) +} + func FHTTPHost(host string) zap.Field { return zap.String(HTTPHostKey, host) } diff --git a/log/with.go b/log/with.go index 1322792..d95886f 100644 --- a/log/with.go +++ b/log/with.go @@ -77,6 +77,16 @@ func WithHTTPRequestID(l *zap.Logger, r *http.Request) *zap.Logger { } } +func WithHTTPReferer(l *zap.Logger, r *http.Request) *zap.Logger { + if value := r.Header.Get("X-Referer"); value != "" { + return With(l, FHTTPReferer(value)) + } else if value := r.Referer(); value != "" { + return With(l, FHTTPHost(value)) + } else { + return l + } +} + func WithHTTPHost(l *zap.Logger, r *http.Request) *zap.Logger { if value := r.Header.Get("X-Forwarded-Host"); value != "" { return With(l, FHTTPHost(value)) @@ -120,6 +130,7 @@ func WithHTTPClientIP(l *zap.Logger, r *http.Request) *zap.Logger { func WithHTTPRequest(l *zap.Logger, r *http.Request) *zap.Logger { l = WithHTTPHost(l, r) + l = WithHTTPReferer(l, r) l = WithHTTPRequestID(l, r) l = WithHTTPSessionID(l, r) l = WithHTTPTrackingID(l, r)