Skip to content

Commit

Permalink
fix: losing events due to late subcribe in go routine (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
phanhuynhquy authored Aug 21, 2020
1 parent 5ecf08d commit 42556eb
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 11 deletions.
3 changes: 2 additions & 1 deletion accesscontroller/orbitdb/accesscontroller_orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ func (o *orbitDBAccessController) Load(ctx context.Context, address string) erro

o.kvStore = store

sub := o.kvStore.Subscribe(ctx)
go func() {
for e := range o.kvStore.Subscribe(ctx) {
for e := range sub {
switch e.(type) {
case stores.EventReady, stores.EventWrite, stores.EventReplicated:
o.onUpdate(ctx)
Expand Down
6 changes: 4 additions & 2 deletions baseorbitdb/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,9 @@ func (o *orbitDB) onClose(addr cid.Cid) error {
}

func (o *orbitDB) storeListener(ctx context.Context, store Store, topic iface.PubSubTopic) {
sub := store.Subscribe(ctx)
go func() {
for evt := range store.Subscribe(ctx) {
for evt := range sub {
switch e := evt.(type) {
case *stores.EventWrite:
o.logger.Debug("received stores.write event")
Expand Down Expand Up @@ -957,8 +958,9 @@ func (o *orbitDB) exchangeHeads(ctx context.Context, p p2pcore.PeerID, addr addr
}

func (o *orbitDB) watchOneOnOneMessage(ctx context.Context, channel iface.DirectChannel) {
sub := channel.Subscribe(ctx)
go func() {
for evt := range channel.Subscribe(ctx) {
for evt := range sub {
o.logger.Debug("received one on one message")

switch e := evt.(type) {
Expand Down
3 changes: 2 additions & 1 deletion stores/basestore/base_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide

b.options = options

sub := b.Replicator().Subscribe(ctx)
go func() {
ctx, span := b.tracer.Start(ctx, "base-store-main-loop", trace.WithAttributes(otkv.String("store-address", b.Address().String())))
defer span.End()

for e := range b.Replicator().Subscribe(ctx) {
for e := range sub {
switch evt := e.(type) {
case *replicator.EventLoadAdded:
span.AddEvent(ctx, "replicator-load-added", otkv.String("hash", evt.Hash.String()))
Expand Down
4 changes: 2 additions & 2 deletions tests/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ func TestPersistence(t *testing.T) {

ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

sub := db.Subscribe(ctx)
go func() {
for evt := range db.Subscribe(ctx) {
for evt := range sub {
switch evt.(type) {
case *stores.EventReady:
l.Lock()
Expand Down
10 changes: 7 additions & 3 deletions tests/replicate_automatically_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ func TestReplicateAutomatically(t *testing.T) {
defer cancel()

hasAllResults := false

sub := db2.Subscribe(ctx)
go func() {
for evt := range db2.Subscribe(ctx) {
for evt := range sub {
switch evt.(type) {
case *stores.EventReplicated:
infinity := -1
Expand Down Expand Up @@ -185,8 +187,9 @@ func TestReplicateAutomatically(t *testing.T) {

infinity := -1

sub1 := db4.Subscribe(ctx)
go func() {
for event := range db4.Subscribe(ctx) {
for event := range sub1 {
switch event.(type) {
case *stores.EventReplicated:
require.Equal(t, "", "Should not happen")
Expand All @@ -200,8 +203,9 @@ func TestReplicateAutomatically(t *testing.T) {
subCtx, subCancel = context.WithTimeout(ctx, time.Second)
defer subCancel()

sub2 := db2.Subscribe(ctx)
go func() {
for event := range db2.Subscribe(ctx) {
for event := range sub2 {
switch event.(type) {
case *stores.EventReplicateProgress:
e := event.(*stores.EventReplicateProgress)
Expand Down
3 changes: 2 additions & 1 deletion tests/replication_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ func TestReplicationStatus(t *testing.T) {
subCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

sub := db2.Subscribe(subCtx)
go func() {
for evt := range db2.Subscribe(subCtx) {
for evt := range sub {
if _, ok := evt.(*stores.EventReplicated); ok {
if db2.ReplicationStatus().GetBuffered() == 0 &&
db2.ReplicationStatus().GetQueued() == 0 &&
Expand Down
3 changes: 2 additions & 1 deletion tests/write_permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ func TestWritePermissions(t *testing.T) {
subCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

sub := db1.Subscribe(ctx)
go func() {
for evt := range db1.Subscribe(ctx) {
for evt := range sub {
switch evt.(type) {
case *stores.EventReplicated:
require.Equal(t, "this", "should not occur")
Expand Down

0 comments on commit 42556eb

Please sign in to comment.