Skip to content

Commit

Permalink
fix: test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Feb 26, 2024
1 parent 0dc7639 commit c56a1c6
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 68 deletions.
2 changes: 1 addition & 1 deletion node/migrations/000002_local_aggregates.up.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE IF NOT EXISTS local_aggregates (
name TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
value INT8 NOT NULL,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL
);
7 changes: 5 additions & 2 deletions node/pkg/admin/adapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package adapter

import (
"encoding/json"
"time"

"bisonai.com/orakl/node/pkg/admin/utils"
"bisonai.com/orakl/node/pkg/bus"
Expand Down Expand Up @@ -126,7 +127,8 @@ func activate(c *fiber.Ctx) error {
if err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("failed to send activate message to fetcher: " + err.Error())
}

// delay for message to be processed
time.Sleep(10 * time.Millisecond)
return c.JSON(result)
}

Expand All @@ -141,6 +143,7 @@ func deactivate(c *fiber.Ctx) error {
if err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("failed to send deactivate message to fetcher: " + err.Error())
}

// delay for message to be processed
time.Sleep(10 * time.Millisecond)
return c.JSON(result)
}
68 changes: 3 additions & 65 deletions node/pkg/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ func TestFetcherAdapterStop(t *testing.T) {
assert.False(t, adapter.isRunning)
}

time.Sleep(WAIT_SECONDS / 2)
rowsBefore, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil)
if err != nil {
t.Fatalf("error reading from db: %v", err)
}
assert.Greater(t, len(rowsBefore), 0)

time.Sleep(WAIT_SECONDS)
time.Sleep(WAIT_SECONDS / 2)

// no rows should be added after stopping
rowsAfter, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil)
Expand Down Expand Up @@ -264,43 +264,13 @@ func TestFetcherAdapterStartById(t *testing.T) {
if _err != nil {
t.Fatalf("error starting adapter: %v", _err)
}

assert.True(t, result.Active)
}

time.Sleep(WAIT_SECONDS)

for _, adapter := range fetcher.Adapters {
assert.True(t, adapter.isRunning)
fetcher.stopAdapter(ctx, adapter)
assert.False(t, adapter.isRunning)
}

rowsAfter, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil)
if err != nil {
t.Fatalf("error reading from db: %v", err)
}
assert.Greater(t, len(rowsAfter), len(rowsBefore))

// clean up db
err = db.QueryWithoutResult(ctx, "DELETE FROM local_aggregates", nil)
if err != nil {
t.Fatalf("error cleaning up from db: %v", err)
}

// check rdb and cleanup rdb
for _, adapter := range fetcher.Adapters {
rdbResult, err := db.Get(ctx, "latestAggregate:"+adapter.Name)
if err != nil {
t.Fatalf("error reading from redis: %v", err)
}
assert.NotNil(t, rdbResult)

err = db.Del(ctx, "latestAggregate:"+adapter.Name)
if err != nil {
t.Fatalf("error removing from redis: %v", err)
}
}
}

func TestFetcherAdapterStopById(t *testing.T) {
Expand All @@ -326,50 +296,18 @@ func TestFetcherAdapterStopById(t *testing.T) {
assert.True(t, adapter.isRunning)
}

time.Sleep(WAIT_SECONDS)
rowsBefore, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil)
if err != nil {
t.Fatalf("error reading from db: %v", err)
}
assert.Greater(t, len(rowsBefore), 0)

for _, adapter := range fetcher.Adapters {
result, _err := tests.PostRequest[Adapter](testItems.app, "/api/v1/adapter/deactivate/"+strconv.FormatInt(adapter.ID, 10), nil)
if _err != nil {
t.Fatalf("error stopping adapter: %v", _err)
}

assert.False(t, result.Active)
}

time.Sleep(WAIT_SECONDS)
// no rows should be added after stopping
rowsAfter, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil)
if err != nil {
t.Fatalf("error reading from db: %v", err)
}
assert.Equal(t, len(rowsAfter), len(rowsBefore))

// clean up db
err = db.QueryWithoutResult(ctx, "DELETE FROM local_aggregates", nil)
if err != nil {
t.Fatalf("error cleaning up from db: %v", err)
}

// check rdb and cleanup rdb
for _, adapter := range fetcher.Adapters {
rdbResult, err := db.Get(ctx, "latestAggregate:"+adapter.Name)
if err != nil {
t.Fatalf("error reading from redis: %v", err)
}
assert.NotNil(t, rdbResult)

err = db.Del(ctx, "latestAggregate:"+adapter.Name)
if err != nil {
t.Fatalf("error removing from redis: %v", err)
}
assert.False(t, adapter.isRunning)
}

}

func TestFetcherFetch(t *testing.T) {
Expand Down

0 comments on commit c56a1c6

Please sign in to comment.