Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV cross-account watch not receiving any messages #1784

Open
bredtape opened this issue Jan 22, 2025 · 1 comment
Open

KV cross-account watch not receiving any messages #1784

bredtape opened this issue Jan 22, 2025 · 1 comment
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@bredtape
Copy link

bredtape commented Jan 22, 2025

Observed behavior

Given a nats-server configured with 2 accounts: KV1 and KV2. Then a key-value store is created in KV1 and exported and imported in KV2.

KV2 can perform Get but Watch (and related operations like WatchAll, ListKeys) will block with no messages delivered.

Also, the backing stream can successfully be read with the OrderedConsumer.

Expected behavior

KeyValue.Watch should deliver all existing entries, but blocks with no messages delivered.

Server and client version

nats-server v2.10.24
nats.go v1.38.0

Host environment

Ubuntu v24.04.1 LTS
Docker v27.5.0
Docker compose v2.32.4

Steps to reproduce

I have created a repository https://github.com/bredtape/nats_kv_watch_fail to recreate the issue. It includes an extra test TestKVWatchWhatWorks to document that the Get and the OrderedConsumer works.

nats server.conf:

jetstream {
  store_dir: /data/jetstream
}

accounts: {
  KV1 {
    jetstream: enable

    users = [{user: user1, password: password1}]

    exports = [
      {service: "$JS.API.>"}
      {stream: "KV_bucket1.>"}
    ]
  }

  KV2 {
    jetstream: enable

    users = [{user: user2, password: password2}]

    imports = [
      {service: {account: KV1, subject: "$JS.API.>"}, to: "import.>"}
      {stream:  {account: KV1, subject: "KV_bucket1.>"}}
    ]
  }
}

Golang unit test:

package nats_kv_watch

import (
	"context"
	"testing"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

const natsURL = "nats://nats:4222"

func TestKVWatchMinimalFails(t *testing.T) {
	ctx := context.Background()

	js1 := getKV1(t)

	bucket := "bucket1"
	kvs1, err := js1.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: bucket})
	if err != nil {
		t.Fatal(err)
	}
	t.Logf("kv1, created kv: %+v", kvs1)

	rev, err := kvs1.Put(ctx, "key1", []byte("value1"))
	if err != nil {
		t.Fatal(err)
	}
	t.Logf("kv1, put key1: %d", rev)

	t.Log("kv2, get")
	js2 := getKV2(t)
	kvs2, err := js2.KeyValue(ctx, bucket)
	if err != nil {
		t.Fatal(err)
	}

	watcher2, err := kvs2.WatchAll(ctx)
	if err != nil {
		t.Fatal(err)
	}

	t.Log("kv2, watch all keys")
	var result2 []jetstream.KeyValueEntry
	for kve := range watcher2.Updates() { // timeout here
		if kve == nil {
			break
		}
		result2 = append(result2, kve)
		t.Logf("kv2, watch received: %+v", kve)
	}

	if len(result2) != 1 {
		t.Fatalf("kvs2 watch all, expected 1 entry, got %d", len(result2))
	}
}

func getKV1(t *testing.T) jetstream.JetStream {
	t.Helper()

	conn, err := nats.Connect(natsURL, nats.UserInfo("user1", "password1"), nats.Name("kv1"), nats.RetryOnFailedConnect(true))
	if err != nil {
		t.Fatal("failed to connect", err)
	}

	js, err := jetstream.New(conn)
	if err != nil {
		t.Fatal(err)
	}
	return js
}

func getKV2(t *testing.T) jetstream.JetStream {
	t.Helper()

	conn, err := nats.Connect(natsURL, nats.UserInfo("user2", "password2"), nats.Name("kv2"), nats.RetryOnFailedConnect(true))
	if err != nil {
		t.Fatal(err)
	}

	trace := jetstream.WithClientTrace(&jetstream.ClientTrace{
		RequestSent: func(subj string, payload []byte) {
			t.Logf("kv2: request sent, subject=%s: %s", subj, string(payload))
		},
		ResponseReceived: func(subj string, payload []byte, hdr nats.Header) {
			t.Logf("kv2: response received, subject=%s, headers=%v: %s", subj, hdr, string(payload))
		}})

	js, err := jetstream.NewWithAPIPrefix(conn, "import", trace)
	if err != nil {
		t.Fatal(err)
	}
	return js
}

logs.txt

The logs indicates that a message is pending "num_pending:" 1:

test-1  |     kv_test.go:149: kv2: response received, subject=import.CONSUMER.CREATE.KV_bucket1.Vc2dPSpp.$KV.bucket1.>, headers=map[]: {"type":"io.nats.jetstream.api.v1.consumer_create_respo
nse","stream_name":"KV_bucket1","name":"Vc2dPSpp","created":"2025-01-22T10:57:44.741509965Z","config":{"name":"Vc2dPSpp","deliver_policy":"last_per_subject","ack_policy":"none","ack_wait":79
200000000000,"max_deliver":1,"filter_subject":"$KV.bucket1.\u003e","replay_policy":"instant","idle_heartbeat":5000000000,"flow_control":true,"deliver_subject":"_INBOX.e1irHlMk1oZzdvw1JiopRe"
,"inactive_threshold":5000000000,"num_replicas":1,"mem_storage":true},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_red
elivered":0,"num_waiting":0,"num_pending":1,"ts":"2025-01-22T10:57:44.741664189Z"}
@bredtape bredtape added the defect Suspected defect such as a bug or regression label Jan 22, 2025
@piotrpio piotrpio self-assigned this Jan 23, 2025
@piotrpio
Copy link
Collaborator

Hello @bredtape, thanks for the reproduction, I'll be looking at this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants