From 409d99f785944b36ed83ffd53d98059842ff5b9d Mon Sep 17 00:00:00 2001 From: Kevin Burke Date: Wed, 10 Apr 2024 20:58:21 -0700 Subject: [PATCH] all: switch to using log/slog instead of log This lets us print structured errors and also introduce leveled logs. --- consumer.go | 47 ++++++++++++++++++++++---------------------- nsqlookup/handler.go | 36 ++++++++++++++++++++++++++------- producer.go | 8 ++++---- 3 files changed, 56 insertions(+), 35 deletions(-) diff --git a/consumer.go b/consumer.go index 566d9a1..f7737c1 100644 --- a/consumer.go +++ b/consumer.go @@ -2,7 +2,7 @@ package nsq import ( "io" - "log" + "log/slog" "net" "net/http" "strconv" @@ -175,32 +175,32 @@ func (c *Consumer) run() { defer ticker.Stop() if err := c.pulse(); err != nil { - log.Print(err) + slog.Error("pulse error", "err", err) } for { select { case <-ticker.C: if err := c.pulse(); err != nil { - log.Print(err) + slog.Error("pulse error", "err", err) } case <-c.done: - log.Println("Consumer initiating shutdown sequence") + slog.Info("Consumer initiating shutdown sequence") // Send a CLS to all Cmd Channels for all connections c.close() - log.Println("draining and re-queueing in-flight messages and awaiting connection waitgroup") + slog.Info("draining and re-queueing in-flight messages and awaiting connection waitgroup") // Drain and re-queue any in-flight messages until all runConn routines return c.drainAndJoinAwait() // At this point all runConn routines have returned, therefore we know // we won't be receiving any new messages from nsqd servers. // But we potentially could have some messages in c.msgs // We can safely close the c.msgs channel and requeue the remaining messages. - log.Println("draining and requeueing remaining in-flight messages") + slog.Info("draining and requeueing remaining in-flight messages") // drain and requeue any remaining in-flight messages close(c.msgs) c.drainRemaining() - log.Println("closing and cleaning up connections") + slog.Info("closing and cleaning up connections") // Cleanup remaining connections c.mtx.Lock() connCloseWg := sync.WaitGroup{} @@ -219,28 +219,27 @@ func (c *Consumer) run() { start := time.Now() for len(cm.CmdChan) > 0 { if time.Since(start) > c.drainTimeout { - log.Println("failed to drain CmdChan for connection, closing now") + slog.Info("failed to drain CmdChan for connection, closing now") break } - log.Println("waiting for write channel to flush any requeue commands") + slog.Info("waiting for write channel to flush any requeue commands") time.Sleep(time.Millisecond * 500) } closeCommand(cm.CmdChan) err := cm.Con.Close() if err != nil { - log.Printf("error returned from connection close %+s", err.Error()) + slog.Error("error closing connection", "err", err) } connCloseWg.Done() }(cm) } c.mtx.Unlock() - success := c.await(&connCloseWg, c.drainTimeout) - if success { - log.Println("successfully flushed all connections") + if success := c.await(&connCloseWg, c.drainTimeout); success { + slog.Info("successfully flushed all connections") } else { - log.Println("timed out awaiting connections flush and close") + slog.Warn("timed out awaiting connections flush and close") } - log.Println("Consumer exiting run") + slog.Info("Consumer exiting run") // Signal to the stop() function that orderly shutdown is complete c.shutJoin.Done() return @@ -280,7 +279,7 @@ func (c *Consumer) drainAndJoinAwait() { return case m, ok := <-c.msgs: if ok { - log.Printf("requeueing %+v\n", m.ID.String()) + slog.Info("requeueing message", "msg", m.ID) sendCommand(m.cmdChan, Req{MessageID: m.ID}) } } @@ -291,7 +290,7 @@ func (c *Consumer) drainAndJoinAwait() { // channel and issues a REQ command for each. func (c *Consumer) drainRemaining() { for m := range c.msgs { - log.Printf("requeueing %+v\n", m.ID.String()) + slog.Info("requeueing message", "msg", m.ID) sendCommand(m.cmdChan, Req{MessageID: m.ID}) } } @@ -332,7 +331,7 @@ func (c *Consumer) pulse() (err error) { cmdChan := make(chan Command, c.maxInFlight+2) conn, err := c.getConn(addr) if err != nil { - log.Printf("failed to connect to %s: %s", addr, err) + slog.Error("failed to connect", "addr", addr, "err", err) continue } cm := connMeta{CmdChan: cmdChan, Con: conn} @@ -348,7 +347,7 @@ func (c *Consumer) pulse() (err error) { } func (c *Consumer) close() { - log.Println("sending CLS to all command channels") + slog.Info("sending CLS to all command channels") c.mtx.Lock() for _, cm := range c.conns { sendCommand(cm.CmdChan, Cls{}) @@ -403,7 +402,7 @@ func (c *Consumer) runConn(conn *Conn, addr string, cmdChan chan Command) { if frame, err = conn.ReadFrame(); err != nil { if err != io.EOF && err != io.ErrUnexpectedEOF { - log.Print(err) + slog.Error("could not read frame", "err", err) } return } @@ -424,16 +423,16 @@ func (c *Consumer) runConn(conn *Conn, addr string, cmdChan chan Command) { return default: - log.Printf("closing connection after receiving an unexpected response from %s: %s", conn.RemoteAddr(), f) + slog.Error("closing connection after receiving an unexpected response", "remote_addr", conn.RemoteAddr(), "response", f) return } case Error: - log.Printf("closing connection after receiving an error from %s: %s", conn.RemoteAddr(), f) + slog.Error("closing connection after receiving an error", "remote_addr", conn.RemoteAddr(), "response", f) return default: - log.Printf("closing connection after receiving an unsupported frame from %s: %s", conn.RemoteAddr(), f.FrameType()) + slog.Error("closing connection after receiving an unsupported frame", "remote_addr", conn.RemoteAddr(), "response", f) return } } @@ -449,7 +448,7 @@ func (c *Consumer) writeConn(conn *Conn, cmdChan chan Command) { for cmd := range cmdChan { if err := c.writeConnCommand(conn, cmd); err != nil { - log.Print(err) + slog.Error("could not write command to channel", "cmd", cmd, "err", err) return } } diff --git a/nsqlookup/handler.go b/nsqlookup/handler.go index fe54c53..51657e1 100644 --- a/nsqlookup/handler.go +++ b/nsqlookup/handler.go @@ -6,7 +6,7 @@ import ( "encoding/json" "errors" "io" - "log" + "log/slog" "net" "net/http" "os" @@ -507,7 +507,11 @@ func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn) { defer func() { if node != nil { err := node.Unregister(ctx) - log.Printf("UNREGISTER node = %s, err = %s", node, err) + if err != nil { + slog.Error("error unregistering node", "node", node, "err", err) + } else { + slog.Info("unregistered node", "node", node) + } } }() defer close(resChan) @@ -550,7 +554,7 @@ func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn) { case Error: res = e default: - log.Print(err) + slog.Error("unknown command error", "cmd", cmd, "err", e) return } } @@ -582,7 +586,11 @@ func (h TCPHandler) identify(ctx context.Context, node Node, info NodeInfo, conn res = RawResponse(b) id, err = h.Engine.RegisterNode(ctx, info) - log.Printf("IDENTIFY node = %s, err = %v", info, err) + if err != nil { + slog.Error("identify node error", "node", info, "err", err) + } else { + slog.Info("identify node", "node", node) + } return } @@ -591,11 +599,25 @@ func (h TCPHandler) ping(ctx context.Context, node Node) (res OK, err error) { ctx, cancel := context.WithTimeout(ctx, h.EngineTimeout) defer cancel() err = node.Ping(ctx) - log.Printf("PING node = %s, err = %v", node, err) + if err != nil { + slog.Error("send ping error", "node", node, "err", err) + } else { + slog.Info("send ping", "node", node) + } } return } +func infoOrErr(msg string, args ...any) { + for i, arg := range args { + if arg == "err" && args[i+1] != nil { + slog.Error(msg, args...) + return + } + } + slog.Info(msg, args...) +} + func (h TCPHandler) register(ctx context.Context, node Node, topic string, channel string) (res OK, err error) { if node == nil { err = errClientMustIdentify @@ -616,7 +638,7 @@ func (h TCPHandler) register(ctx context.Context, node Node, topic string, chann err = makeErrBadTopic("missing topic name") } - log.Printf("REGISTER node = %s, topic = %s, channel = %s, err = %v", node, topic, channel, err) + infoOrErr("register", "node", node, "topic", topic, "channel", channel, "err", err) return } @@ -644,7 +666,7 @@ func (h TCPHandler) unregister(ctx context.Context, node Node, topic string, cha id = node } - log.Printf("UNREGISTER node = %s, topic = %s, channel = %s, err = %v", node, topic, channel, err) + infoOrErr("unregister node", "node", node, "topic", topic, "channel", channel, "err", err) return } diff --git a/producer.go b/producer.go index 9c56eb5..0d22239 100644 --- a/producer.go +++ b/producer.go @@ -1,7 +1,7 @@ package nsq import ( - "log" + "log/slog" "sync" "sync/atomic" "time" @@ -240,15 +240,15 @@ func (p *Producer) run() { } if err != nil { - log.Printf("closing nsqd connection to %s: %s", p.address, err) + slog.Error("closing nsqd connection", "addr", p.address, "err", err) } } connect := func() (err error) { - log.Printf("opening nsqd connection to %s", p.address) + slog.Info("opening nsqd connection", "addr", p.address) if conn, err = DialTimeout(p.address, p.dialTimeout); err != nil { - log.Printf("failed to connect to nsqd at %s: %s", p.address, err) + slog.Error("failed to connect to nsqd", "addr", p.address, "err", err) return }