Skip to content

all: switch to using log/slog instead of log #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package nsq

import (
"io"
"log"
"log/slog"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -175,32 +175,32 @@ func (c *Consumer) run() {
defer ticker.Stop()

if err := c.pulse(); err != nil {
log.Print(err)
slog.Error("pulse error", "err", err)
}

for {
select {
case <-ticker.C:
if err := c.pulse(); err != nil {
log.Print(err)
slog.Error("pulse error", "err", err)
}

case <-c.done:
log.Println("Consumer initiating shutdown sequence")
slog.Info("Consumer initiating shutdown sequence")
// Send a CLS to all Cmd Channels for all connections
c.close()
log.Println("draining and re-queueing in-flight messages and awaiting connection waitgroup")
slog.Info("draining and re-queueing in-flight messages and awaiting connection waitgroup")
// Drain and re-queue any in-flight messages until all runConn routines return
c.drainAndJoinAwait()
// At this point all runConn routines have returned, therefore we know
// we won't be receiving any new messages from nsqd servers.
// But we potentially could have some messages in c.msgs
// We can safely close the c.msgs channel and requeue the remaining messages.
log.Println("draining and requeueing remaining in-flight messages")
slog.Info("draining and requeueing remaining in-flight messages")
// drain and requeue any remaining in-flight messages
close(c.msgs)
c.drainRemaining()
log.Println("closing and cleaning up connections")
slog.Info("closing and cleaning up connections")
// Cleanup remaining connections
c.mtx.Lock()
connCloseWg := sync.WaitGroup{}
Expand All @@ -219,28 +219,27 @@ func (c *Consumer) run() {
start := time.Now()
for len(cm.CmdChan) > 0 {
if time.Since(start) > c.drainTimeout {
log.Println("failed to drain CmdChan for connection, closing now")
slog.Info("failed to drain CmdChan for connection, closing now")
break
}
log.Println("waiting for write channel to flush any requeue commands")
slog.Info("waiting for write channel to flush any requeue commands")
time.Sleep(time.Millisecond * 500)
}
closeCommand(cm.CmdChan)
err := cm.Con.Close()
if err != nil {
log.Printf("error returned from connection close %+s", err.Error())
slog.Error("error closing connection", "err", err)
}
connCloseWg.Done()
}(cm)
}
c.mtx.Unlock()
success := c.await(&connCloseWg, c.drainTimeout)
if success {
log.Println("successfully flushed all connections")
if success := c.await(&connCloseWg, c.drainTimeout); success {
slog.Info("successfully flushed all connections")
} else {
log.Println("timed out awaiting connections flush and close")
slog.Warn("timed out awaiting connections flush and close")
}
log.Println("Consumer exiting run")
slog.Info("Consumer exiting run")
// Signal to the stop() function that orderly shutdown is complete
c.shutJoin.Done()
return
Expand Down Expand Up @@ -280,7 +279,7 @@ func (c *Consumer) drainAndJoinAwait() {
return
case m, ok := <-c.msgs:
if ok {
log.Printf("requeueing %+v\n", m.ID.String())
slog.Info("requeueing message", "msg", m.ID)
sendCommand(m.cmdChan, Req{MessageID: m.ID})
}
}
Expand All @@ -291,7 +290,7 @@ func (c *Consumer) drainAndJoinAwait() {
// channel and issues a REQ command for each.
func (c *Consumer) drainRemaining() {
for m := range c.msgs {
log.Printf("requeueing %+v\n", m.ID.String())
slog.Info("requeueing message", "msg", m.ID)
sendCommand(m.cmdChan, Req{MessageID: m.ID})
}
}
Expand Down Expand Up @@ -332,7 +331,7 @@ func (c *Consumer) pulse() (err error) {
cmdChan := make(chan Command, c.maxInFlight+2)
conn, err := c.getConn(addr)
if err != nil {
log.Printf("failed to connect to %s: %s", addr, err)
slog.Error("failed to connect", "addr", addr, "err", err)
continue
}
cm := connMeta{CmdChan: cmdChan, Con: conn}
Expand All @@ -348,7 +347,7 @@ func (c *Consumer) pulse() (err error) {
}

func (c *Consumer) close() {
log.Println("sending CLS to all command channels")
slog.Info("sending CLS to all command channels")
c.mtx.Lock()
for _, cm := range c.conns {
sendCommand(cm.CmdChan, Cls{})
Expand Down Expand Up @@ -403,7 +402,7 @@ func (c *Consumer) runConn(conn *Conn, addr string, cmdChan chan Command) {

if frame, err = conn.ReadFrame(); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Print(err)
slog.Error("could not read frame", "err", err)
}
return
}
Expand All @@ -424,16 +423,16 @@ func (c *Consumer) runConn(conn *Conn, addr string, cmdChan chan Command) {
return

default:
log.Printf("closing connection after receiving an unexpected response from %s: %s", conn.RemoteAddr(), f)
slog.Error("closing connection after receiving an unexpected response", "remote_addr", conn.RemoteAddr(), "response", f)
return
}

case Error:
log.Printf("closing connection after receiving an error from %s: %s", conn.RemoteAddr(), f)
slog.Error("closing connection after receiving an error", "remote_addr", conn.RemoteAddr(), "response", f)
return

default:
log.Printf("closing connection after receiving an unsupported frame from %s: %s", conn.RemoteAddr(), f.FrameType())
slog.Error("closing connection after receiving an unsupported frame", "remote_addr", conn.RemoteAddr(), "response", f)
return
}
}
Expand All @@ -449,7 +448,7 @@ func (c *Consumer) writeConn(conn *Conn, cmdChan chan Command) {

for cmd := range cmdChan {
if err := c.writeConnCommand(conn, cmd); err != nil {
log.Print(err)
slog.Error("could not write command to channel", "cmd", cmd, "err", err)
return
}
}
Expand Down
36 changes: 29 additions & 7 deletions nsqlookup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/json"
"errors"
"io"
"log"
"log/slog"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -507,7 +507,11 @@ func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn) {
defer func() {
if node != nil {
err := node.Unregister(ctx)
log.Printf("UNREGISTER node = %s, err = %s", node, err)
if err != nil {
slog.Error("error unregistering node", "node", node, "err", err)
} else {
slog.Info("unregistered node", "node", node)
}
}
}()
defer close(resChan)
Expand Down Expand Up @@ -550,7 +554,7 @@ func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn) {
case Error:
res = e
default:
log.Print(err)
slog.Error("unknown command error", "cmd", cmd, "err", e)
return
}
}
Expand Down Expand Up @@ -582,7 +586,11 @@ func (h TCPHandler) identify(ctx context.Context, node Node, info NodeInfo, conn
res = RawResponse(b)
id, err = h.Engine.RegisterNode(ctx, info)

log.Printf("IDENTIFY node = %s, err = %v", info, err)
if err != nil {
slog.Error("identify node error", "node", info, "err", err)
} else {
slog.Info("identify node", "node", node)
}
return
}

Expand All @@ -591,11 +599,25 @@ func (h TCPHandler) ping(ctx context.Context, node Node) (res OK, err error) {
ctx, cancel := context.WithTimeout(ctx, h.EngineTimeout)
defer cancel()
err = node.Ping(ctx)
log.Printf("PING node = %s, err = %v", node, err)
if err != nil {
slog.Error("send ping error", "node", node, "err", err)
} else {
slog.Info("send ping", "node", node)
}
}
return
}

func infoOrErr(msg string, args ...any) {
for i, arg := range args {
if arg == "err" && args[i+1] != nil {
slog.Error(msg, args...)
return
}
}
slog.Info(msg, args...)
}

func (h TCPHandler) register(ctx context.Context, node Node, topic string, channel string) (res OK, err error) {
if node == nil {
err = errClientMustIdentify
Expand All @@ -616,7 +638,7 @@ func (h TCPHandler) register(ctx context.Context, node Node, topic string, chann
err = makeErrBadTopic("missing topic name")
}

log.Printf("REGISTER node = %s, topic = %s, channel = %s, err = %v", node, topic, channel, err)
infoOrErr("register", "node", node, "topic", topic, "channel", channel, "err", err)
return
}

Expand Down Expand Up @@ -644,7 +666,7 @@ func (h TCPHandler) unregister(ctx context.Context, node Node, topic string, cha
id = node
}

log.Printf("UNREGISTER node = %s, topic = %s, channel = %s, err = %v", node, topic, channel, err)
infoOrErr("unregister node", "node", node, "topic", topic, "channel", channel, "err", err)
return
}

Expand Down
8 changes: 4 additions & 4 deletions producer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package nsq

import (
"log"
"log/slog"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -240,15 +240,15 @@ func (p *Producer) run() {
}

if err != nil {
log.Printf("closing nsqd connection to %s: %s", p.address, err)
slog.Error("closing nsqd connection", "addr", p.address, "err", err)
}
}

connect := func() (err error) {
log.Printf("opening nsqd connection to %s", p.address)
slog.Info("opening nsqd connection", "addr", p.address)

if conn, err = DialTimeout(p.address, p.dialTimeout); err != nil {
log.Printf("failed to connect to nsqd at %s: %s", p.address, err)
slog.Error("failed to connect to nsqd", "addr", p.address, "err", err)
return
}

Expand Down
Loading