diff --git a/accesscontroller/orbitdb/accesscontroller_orbitdb.go b/accesscontroller/orbitdb/accesscontroller_orbitdb.go index 390089d0..3af75b2a 100644 --- a/accesscontroller/orbitdb/accesscontroller_orbitdb.go +++ b/accesscontroller/orbitdb/accesscontroller_orbitdb.go @@ -208,8 +208,9 @@ func (o *orbitDBAccessController) Load(ctx context.Context, address string) erro o.kvStore = store + sub := o.kvStore.Subscribe(ctx) go func() { - for e := range o.kvStore.Subscribe(ctx) { + for e := range sub { switch e.(type) { case stores.EventReady, stores.EventWrite, stores.EventReplicated: o.onUpdate(ctx) diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index 0d4dcf08..a2ed5030 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -784,8 +784,9 @@ func (o *orbitDB) onClose(addr cid.Cid) error { } func (o *orbitDB) storeListener(ctx context.Context, store Store, topic iface.PubSubTopic) { + sub := store.Subscribe(ctx) go func() { - for evt := range store.Subscribe(ctx) { + for evt := range sub { switch e := evt.(type) { case *stores.EventWrite: o.logger.Debug("received stores.write event") @@ -957,8 +958,9 @@ func (o *orbitDB) exchangeHeads(ctx context.Context, p p2pcore.PeerID, addr addr } func (o *orbitDB) watchOneOnOneMessage(ctx context.Context, channel iface.DirectChannel) { + sub := channel.Subscribe(ctx) go func() { - for evt := range channel.Subscribe(ctx) { + for evt := range sub { o.logger.Debug("received one on one message") switch e := evt.(type) { diff --git a/stores/basestore/base_store.go b/stores/basestore/base_store.go index f99c1a4f..63a34b64 100644 --- a/stores/basestore/base_store.go +++ b/stores/basestore/base_store.go @@ -202,11 +202,12 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide b.options = options + sub := b.Replicator().Subscribe(ctx) go func() { ctx, span := b.tracer.Start(ctx, "base-store-main-loop", trace.WithAttributes(otkv.String("store-address", b.Address().String()))) defer span.End() - for e := range b.Replicator().Subscribe(ctx) { + for e := range sub { switch evt := e.(type) { case *replicator.EventLoadAdded: span.AddEvent(ctx, "replicator-load-added", otkv.String("hash", evt.Hash.String())) diff --git a/tests/persistence_test.go b/tests/persistence_test.go index 166f0aa4..7e09a423 100644 --- a/tests/persistence_test.go +++ b/tests/persistence_test.go @@ -169,9 +169,9 @@ func TestPersistence(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() - + sub := db.Subscribe(ctx) go func() { - for evt := range db.Subscribe(ctx) { + for evt := range sub { switch evt.(type) { case *stores.EventReady: l.Lock() diff --git a/tests/replicate_automatically_test.go b/tests/replicate_automatically_test.go index 8a8a2bb6..d6b87216 100644 --- a/tests/replicate_automatically_test.go +++ b/tests/replicate_automatically_test.go @@ -122,8 +122,10 @@ func TestReplicateAutomatically(t *testing.T) { defer cancel() hasAllResults := false + + sub := db2.Subscribe(ctx) go func() { - for evt := range db2.Subscribe(ctx) { + for evt := range sub { switch evt.(type) { case *stores.EventReplicated: infinity := -1 @@ -185,8 +187,9 @@ func TestReplicateAutomatically(t *testing.T) { infinity := -1 + sub1 := db4.Subscribe(ctx) go func() { - for event := range db4.Subscribe(ctx) { + for event := range sub1 { switch event.(type) { case *stores.EventReplicated: require.Equal(t, "", "Should not happen") @@ -200,8 +203,9 @@ func TestReplicateAutomatically(t *testing.T) { subCtx, subCancel = context.WithTimeout(ctx, time.Second) defer subCancel() + sub2 := db2.Subscribe(ctx) go func() { - for event := range db2.Subscribe(ctx) { + for event := range sub2 { switch event.(type) { case *stores.EventReplicateProgress: e := event.(*stores.EventReplicateProgress) diff --git a/tests/replication_status_test.go b/tests/replication_status_test.go index a7deed09..1c4a9b9b 100644 --- a/tests/replication_status_test.go +++ b/tests/replication_status_test.go @@ -123,8 +123,9 @@ func TestReplicationStatus(t *testing.T) { subCtx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() + sub := db2.Subscribe(subCtx) go func() { - for evt := range db2.Subscribe(subCtx) { + for evt := range sub { if _, ok := evt.(*stores.EventReplicated); ok { if db2.ReplicationStatus().GetBuffered() == 0 && db2.ReplicationStatus().GetQueued() == 0 && diff --git a/tests/write_permissions_test.go b/tests/write_permissions_test.go index fdffb94b..f6b98321 100644 --- a/tests/write_permissions_test.go +++ b/tests/write_permissions_test.go @@ -199,8 +199,9 @@ func TestWritePermissions(t *testing.T) { subCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() + sub := db1.Subscribe(ctx) go func() { - for evt := range db1.Subscribe(ctx) { + for evt := range sub { switch evt.(type) { case *stores.EventReplicated: require.Equal(t, "this", "should not occur")