Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stats task added #29

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"@koa/cors": "^3.1.0",
"@koa/router": "10.1.1",
"@types/yargs": "^17.0.7",
"arweave": "1.10.18",
"arweave": "^1.10.23",
"axios": "0.24.0",
"dotenv": "10.0.0",
"knex": "0.95.14",
Expand All @@ -24,26 +24,26 @@
"koa-compress": "^5.1.0",
"node-fetch": "3.1.0",
"pg": "^8.7.1",
"redstone-smartweave": "^0.4.10",
"redstone-smartweave": "^0.4.13",
"yargs": "^17.3.0"
},
"devDependencies": {
"@types/jest": "^27.0.3",
"@types/cli-table": "^0.3.0",
"@types/jest": "^27.0.3",
"@types/koa-bodyparser": "^4.3.4",
"@types/koa__router": "8.0.11",
"@types/node-fetch": "3.0.3",
"@types/object-hash": "2.2.1",
"@typescript-eslint/eslint-plugin": "4.33.0",
"@typescript-eslint/parser": "4.33.0",
"cli-table": "^0.3.11",
"colors": "^1.4.0",
"cross-env": "7.0.3",
"eslint": "8.3.0",
"jest": "^27.4.5",
"prettier": "2.5.0",
"ts-jest": "^27.1.2",
"ts-node-dev": "1.1.8",
"typescript": "4.5.2",
"jest": "^27.4.5",
"cli-table": "^0.3.11",
"colors": "^1.4.0",
"ts-jest": "^27.1.2"
"typescript": "4.5.2"
}
}
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// note: the first SW contract was registered at 472808 block height
export const MIN_BLOCK_HEIGHT = 472808;
19 changes: 19 additions & 0 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,23 @@ export async function initGatewayDb(db: Knex) {
table.string("project", 64).index();
});
}

if (!(await db.schema.hasTable("stats_contracts"))) {
await db.schema.createTable("stats_contracts", (table) => {
table.string("contract_id", 64).primary();
table.string("owner", 64).index();
table.bigInteger("block_height").notNullable().index();
table.string("block_id").notNullable();
table.string("timestamp").index();
table.jsonb("fee");
});
}

if (!(await db.schema.hasTable("stats_tags"))) {
await db.schema.createTable("stats_tags", (table) => {
table.string("contract_id", 64).notNullable().primary();
table.string("name").notNullable().index();
table.text("value").notNullable().index();
});
}
}
1 change: 0 additions & 1 deletion src/gateway/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ export interface GatewayContext {

app.listen(port);
logger.info(`Listening on port ${port}`);

if (!fs.existsSync('gateway.lock')) {
try {
logger.debug(`Creating lock file for ${cluster.worker?.id}`);
Expand Down
12 changes: 8 additions & 4 deletions src/gateway/router/gatewayRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import Router from "@koa/router";
import {contractsRoute} from "./routes/contractsRoute";
import {interactionsRoute} from "./routes/interactionsRoute";
import {searchRoute} from "./routes/searchRoute";
import {statsRoute} from "./routes/statsRoute";
import {statsTxPerDayRoute} from "./routes/statsTxPerDayRoute";
import {statsTotalInteractionsRoute} from "./routes/stats/statsTotalInteractionsRoute";
import {statsTxPerDayRoute} from "./routes/stats/statsTxPerDayRoute";
import {statsContractsPerMonthRoute} from "./routes/stats/statsContractsPerMonth";
import {statsTagsRoute} from "./routes/stats/statsTagsRoute";
import {contractRoute} from "./routes/contractRoute";
import {interactionRoute} from "./routes/interactionRoute";

Expand All @@ -14,7 +16,9 @@ gatewayRouter.get("/contracts/:id", contractRoute);
gatewayRouter.get("/search/:phrase", searchRoute);
gatewayRouter.get("/interactions", interactionsRoute);
gatewayRouter.get("/interactions/:id", interactionRoute);
gatewayRouter.get("/stats", statsRoute);
gatewayRouter.get("/stats/per-day", statsTxPerDayRoute);
gatewayRouter.get("/stats/total", statsTotalInteractionsRoute);
gatewayRouter.get("/stats/tx-per-day", statsTxPerDayRoute);
gatewayRouter.get("/stats/contracts-per-month", statsContractsPerMonthRoute);
gatewayRouter.get("/stats/tags", statsTagsRoute);

export default gatewayRouter;
29 changes: 29 additions & 0 deletions src/gateway/router/routes/stats/statsContractsPerMonth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import Router from "@koa/router";
import {Benchmark} from "redstone-smartweave";

export async function statsContractsPerMonthRoute(ctx: Router.RouterContext) {
const {logger, gatewayDb} = ctx;

try {
const benchmark = Benchmark.measure();
const result: any = await gatewayDb.raw(
`
WITH contracts_per_month AS (
SELECT to_timestamp(timestamp::integer) as date, contract_id as contract
FROM stats
)
SELECT DATE_TRUNC('month', date) AS contracts_to_month,
COUNT(contract) AS count
FROM contracts_per_month
GROUP BY DATE_TRUNC('month', date)
ORDER BY DATE_TRUNC('month', date) ASC;
`
);
ctx.body = result?.rows;
logger.debug("Stats loaded in", benchmark.elapsed());
} catch (e: any) {
ctx.logger.error(e);
ctx.status = 500;
ctx.body = {message: e};
}
}
26 changes: 26 additions & 0 deletions src/gateway/router/routes/stats/statsTagsRoute.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import Router from "@koa/router";
import {Benchmark} from "redstone-smartweave";

export async function statsTagsRoute(ctx: Router.RouterContext) {
const {logger, gatewayDb} = ctx;

try {
const benchmark = Benchmark.measure();
const result: any = await gatewayDb.raw(
`
SELECT tg.value as "Content-Type", count(tg.value) as amount
FROM contracts c
JOIN tags tg on tg.contract_id = c.contract_id
WHERE tg.name = 'Content-Type'
GROUP BY tg.value
ORDER BY count(tg.value) desc;
`
);
ctx.body = result?.rows;
logger.debug("Contracts stats loaded in", benchmark.elapsed());
} catch (e: any) {
ctx.logger.error(e);
ctx.status = 500;
ctx.body = {message: e};
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
import Router from "@koa/router";
import {Benchmark} from "redstone-smartweave";

export async function statsRoute(ctx: Router.RouterContext) {
export async function statsTotalInteractionsRoute(ctx: Router.RouterContext) {
const {logger, gatewayDb} = ctx;

const {phrase} = ctx.params;

if (phrase?.length < 3) {
ctx.body = {};
return;
}

try {
const benchmark = Benchmark.measure();
const result: any = await gatewayDb.raw(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ import {Benchmark} from "redstone-smartweave";
export async function statsTxPerDayRoute(ctx: Router.RouterContext) {
const {logger, gatewayDb} = ctx;

const {phrase} = ctx.params;

if (phrase?.length < 3) {
ctx.body = [];
return;
}

try {
const benchmark = Benchmark.measure();
const result: any = await gatewayDb.raw(
Expand Down
6 changes: 6 additions & 0 deletions src/gateway/runGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {runVerifyCorruptedTransactionsTask} from "./tasks/verifyCorruptedTransac
import {runSyncTransactionsTask} from "./tasks/syncTransactions";
import {GatewayContext} from "./init";
import {runContractsMetadataTask} from "./tasks/contractsMetadata";
import {runStatsTask} from "./tasks/stats";


/**
Expand All @@ -29,6 +30,9 @@ import {runContractsMetadataTask} from "./tasks/contractsMetadata";
* re-check the interaction won't be recognized as corrupted - it is returned to the "not processed" pool.
*
* 5. contracts metadata task - loads the contracts metadata (src, init state, owner, etc.)
*
* 6. stats task - loads all the contracts (including all data types, with or without interacions) for statistical purposes,
* runs every 3 hours. Similair logic applied as for blocks sync task - task listens for new blocks and loads new contracts.
*
* note: as there are very little fully synced nodes and they often timeout/504 - this process is a real pain...
*/
Expand All @@ -42,4 +46,6 @@ export async function runGateway(context: GatewayContext) {
await runVerifyCorruptedTransactionsTask(context);

await runContractsMetadataTask(context);

await runStatsTask(context);
}
147 changes: 147 additions & 0 deletions src/gateway/tasks/stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import {TaskRunner} from "./TaskRunner";
import {GatewayContext} from "../init";
import {loadPages} from './utils/gqlPageLoading';
import {MIN_BLOCK_HEIGHT} from '../../constants';

const STATS_INTERVAL_MS = 3 * 60 * 60 * 1000;

const QUERY = `
query Transactions($tags: [TagFilter!]!, $blockFilter: BlockFilter!, $first: Int!, $after: String) {
transactions(tags: $tags, block: $blockFilter, first: $first, sort: HEIGHT_ASC, after: $after) {
pageInfo {
hasNextPage
}
edges {
node {
id
owner {
address
}
tags {
name
value
}
fee {
winston
ar
}
data {
type
}
block {
id
timestamp
height
}
}
cursor
}
}
}
`

export async function runStatsTask(context: GatewayContext) {
await TaskRunner
.from("[stats]", loadStats, context)
.runAsyncEvery(STATS_INTERVAL_MS);
}

async function loadStats(context: GatewayContext) {
const {logger, gatewayDb, arweave} = context;

let results: any[];
try {
results = await Promise.allSettled([
gatewayDb("stats_contracts")
.select("block_height")
.orderBy("block_height", "desc")
.limit(1)
.first(),
arweave.network.getInfo(),
]);
} catch (e: any) {
logger.error("Error while checking new blocks", e.message);
return;
}

const rejections = results.filter((r) => {
return r.status === "rejected";
});

if (rejections.length > 0) {
logger.error("Error while processing next block", rejections.map((r) => r.message));
return;
}

const currentNetworkHeight = results[1].value.height;

const lastProcessedBlockHeight = results[0].value?.block_height || MIN_BLOCK_HEIGHT;
logger.debug(`Last processed block height: ${lastProcessedBlockHeight}`);

logger.debug("Network info", {
currentNetworkHeight,
lastProcessedBlockHeight,
});

const heightFrom = parseInt(lastProcessedBlockHeight) - 10;
let heightTo = currentNetworkHeight;
if (heightTo > heightFrom + 7000) {
heightTo = heightFrom + 7000;
}

logger.debug("Loading contracts for blocks", {
heightFrom,
heightTo,
});

const variables = {
tags: [
{
name: 'App-Name',
values: ['SmartWeaveContract']
}
],
blockFilter: {
min: heightFrom,
max: heightTo,
},
first: 100
}

const contracts = await loadPages(context, variables, QUERY);

for (let i = 0; i< contracts.length; i++) {
try {
await gatewayDb("stats_contracts")
.insert({
contract_id: contracts[i].node.id,
owner: contracts[i].node.owner.address,
block_height: contracts[i].node.block.height,
block_id: contracts[i].node.block.id,
timestamp: contracts[i].node.block.timestamp,
fee: contracts[i].node.fee
})
.onConflict("contract_id")
.merge();
} catch (e) {
logger.error("Error while loading contract stats", e);

}

for (let j = 0; j < contracts[i].node.tags.length; j++) {
try {
await gatewayDb("stats_tags")
.insert({
contract_id: contracts[i].node.id,
value: contracts[i].node.tags[j].value,
name: contracts[i].node.tags[j].name
})
.onConflict("contract_id")
.ignore();
} catch (e) {
logger.error("Error while loading tags")
}
}
}

}
Loading