Skip to content

Commit

Permalink
Merge pull request #787 from desci-labs/finetune-x-pub
Browse files Browse the repository at this point in the history
feat: cache external pub, send email notifications and return default
  • Loading branch information
shadrach-tayo authored Jan 27, 2025
2 parents 4c39472 + 8a854de commit 4447841
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 224 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
// import WebSocket from 'isomorphic-ws';
// import { type WebSocketServer } from 'isomorphic-ws';

import { cbor as cborHelpers, NetworkAdapter, type PeerMetadata, type PeerId } from '@automerge/automerge-repo/slim';
import debug from 'debug';
const log = debug('WebsocketServer');
import { Connection as WebSocket } from 'partyserver';

import { cbor as cborHelpers, NetworkAdapter, type PeerMetadata, type PeerId } from '@automerge/automerge-repo/slim';
import { assert } from './assert.js';
import { FromClientMessage, FromServerMessage, isJoinMessage, isLeaveMessage } from './messages.js';
import { ProtocolV1, ProtocolVersion } from './protocolVersion.js';
import { assert } from './assert.js';
import { toArrayBuffer } from './toArrayBuffer.js';
import { Connection as WebSocket } from 'partyserver';

const log = debug('WebsocketServer');
// import { handleChunked, sendChunked } from './chunking.js';

const { encode, decode } = cborHelpers;
Expand Down
81 changes: 43 additions & 38 deletions desci-repo/src/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@ const hostname = os.hostname();

logger.trace({ partyServerHost, partyServerToken, serverName: os.hostname() ?? 'no-hostname' }, 'Env checked');

let config: RepoConfig;
let socket: WebSocketServer;
let config: RepoConfig = {
peerId: `repo-server-${hostname}` as PeerId,
// Since this is a server, we don't share generously — meaning we only sync documents they already
// know about and can ask for by ID.
sharePolicy: async (peerId, documentId) => {
// logger.trace({ peerId, documentId }, 'SharePolicy called');
return true;
},
};
// let socket: WebSocketServer;

if (ENABLE_PARTYKIT_FEATURE) {
config = {
Expand All @@ -48,44 +56,41 @@ if (ENABLE_PARTYKIT_FEATURE) {
},
};
} else {
socket = new WebSocketServer({
port: process.env.WS_PORT ? parseInt(process.env.WS_PORT) : 5445,
path: '/sync',
});

const adapter = new NodeWSServerAdapter(socket);

config = {
network: [adapter],
storage: new PostgresStorageAdapter(),
peerId: `repo-server-${hostname}` as PeerId,
// Since this is a server, we don't share generously — meaning we only sync documents they already
// know about and can ask for by ID.
sharePolicy: async (peerId, documentId) => {
try {
if (!documentId) {
logger.trace({ peerId }, 'SharePolicy: Document ID NOT found');
return false;
}
// peer format: `peer-[user#id]:[unique string combination]
if (peerId.toString().length < 8) {
logger.error({ peerId }, 'SharePolicy: Peer ID invalid');
return false;
}

const userId = peerId.split(':')?.[0]?.split('-')?.[1];
const isAuthorised = await verifyNodeDocumentAccess(Number(userId), documentId);
logger.trace({ peerId, userId, documentId, isAuthorised }, '[SHARE POLICY CALLED]::');
return isAuthorised;
} catch (err) {
logger.error({ err }, 'Error in share policy');
return false;
}
},
};
// socket = new WebSocketServer({
// port: process.env.WS_PORT ? parseInt(process.env.WS_PORT) : 5445,
// path: '/sync',
// });
// const adapter = new NodeWSServerAdapter(socket);
// config = {
// network: [adapter],
// storage: new PostgresStorageAdapter(),
// peerId: `repo-server-${hostname}` as PeerId,
// // Since this is a server, we don't share generously — meaning we only sync documents they already
// // know about and can ask for by ID.
// sharePolicy: async (peerId, documentId) => {
// try {
// if (!documentId) {
// logger.trace({ peerId }, 'SharePolicy: Document ID NOT found');
// return false;
// }
// // peer format: `peer-[user#id]:[unique string combination]
// if (peerId.toString().length < 8) {
// logger.error({ peerId }, 'SharePolicy: Peer ID invalid');
// return false;
// }
// const userId = peerId.split(':')?.[0]?.split('-')?.[1];
// const isAuthorised = await verifyNodeDocumentAccess(Number(userId), documentId);
// logger.trace({ peerId, userId, documentId, isAuthorised }, '[SHARE POLICY CALLED]::');
// return isAuthorised;
// } catch (err) {
// logger.error({ err }, 'Error in share policy');
// return false;
// }
// },
// };
}

export { socket };
// export { socket };

export const backendRepo = new Repo(config);

Expand Down
77 changes: 34 additions & 43 deletions desci-repo/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
import 'reflect-metadata';
import type { Server as HttpServer } from 'http';
import path from 'path';
import { fileURLToPath } from 'url';

import * as Sentry from '@sentry/node';
import { nodeProfilingIntegration } from '@sentry/profiling-node';

const ENABLE_TELEMETRY = process.env.NODE_ENV === 'production';
const IS_DEV = !ENABLE_TELEMETRY;

// @ts-check

import type { Server as HttpServer } from 'http';
import { fileURLToPath } from 'url';

import 'dotenv/config';
import 'reflect-metadata';
import bodyParser from 'body-parser';
Expand All @@ -23,10 +16,10 @@ import { v4 } from 'uuid';

import { als, logger } from './logger.js';
import { RequestWithUser } from './middleware/guard.js';
import { extractAuthToken, extractUserFromToken } from './middleware/permissions.js';
import routes from './routes/index.js';

import { ENABLE_PARTYKIT_FEATURE } from './config.js';
const ENABLE_TELEMETRY = process.env.NODE_ENV === 'production';
const IS_DEV = !ENABLE_TELEMETRY;

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
Expand Down Expand Up @@ -136,41 +129,39 @@ class AppServer {
logger.info(`Server running on port ${this.port}`);
});

if (!ENABLE_PARTYKIT_FEATURE) {
this.acceptWebsocketConnections();
}
// if (!ENABLE_PARTYKIT_FEATURE) {
// this.acceptWebsocketConnections();
// }
}

async acceptWebsocketConnections() {
const wsSocket = await import('./repo.js').then((x) => x.socket);

wsSocket.on('listening', () => {
logger.info({ module: 'WebSocket SERVER', port: wsSocket.address() }, 'WebSocket Server Listening');
});

wsSocket.on('connection', async (socket, request) => {
try {
logger.info({ module: 'WebSocket' }, 'WebSocket Connection Attempt');
const token = await extractAuthToken(request as Request);
const authUser = await extractUserFromToken(token!);
if (!authUser) {
socket.close(); // Close connection if user is not authorized
return;
}
logger.info(
{ module: 'WebSocket SERVER', id: authUser.id, name: authUser.name },
'WebSocket Connection Authorised',
);
socket.on('message', (message) => {
// Handle incoming messages
// console.log(`Received message: ${message}`);
});
// Additional event listeners (e.g., 'close', 'error') can be set up here
} catch (error) {
socket.close(); // Close the connection in case of an error
logger.error(error, 'Error during WebSocket connection');
}
});
// const wsSocket = await import('./repo.js').then((x) => x.socket);
// wsSocket.on('listening', () => {
// logger.info({ module: 'WebSocket SERVER', port: wsSocket.address() }, 'WebSocket Server Listening');
// });
// wsSocket.on('connection', async (socket, request) => {
// try {
// logger.info({ module: 'WebSocket' }, 'WebSocket Connection Attempt');
// const token = await extractAuthToken(request as Request);
// const authUser = await extractUserFromToken(token!);
// if (!authUser) {
// socket.close(); // Close connection if user is not authorized
// return;
// }
// logger.info(
// { module: 'WebSocket SERVER', id: authUser.id, name: authUser.name },
// 'WebSocket Connection Authorised',
// );
// socket.on('message', (message) => {
// // Handle incoming messages
// // console.log(`Received message: ${message}`);
// });
// // Additional event listeners (e.g., 'close', 'error') can be set up here
// } catch (error) {
// socket.close(); // Close the connection in case of an error
// logger.error(error, 'Error during WebSocket connection');
// }
// });
}

#initSerialiser() {
Expand Down
1 change: 1 addition & 0 deletions desci-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"script:backfill-annotations": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/backfill-annotations.ts",
"script:prune-auth-tokens": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/prune-auth-tokens.ts",
"script:backfill-radar": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/backfill-radar.ts",
"script:pruneExternalPublications": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/pruneExternalPublications.ts",
"build": "rimraf dist && tsc && yarn copy-files; if [ \"$SENTRY_AUTH_TOKEN\" ]; then yarn sentry:sourcemaps; else echo 'SENTRY_AUTH_TOKEN not set, sourcemaps will not upload'; fi",
"build:worker": "cd ../sync-server && ./scripts/build.sh test",
"copy-files": "copyfiles -u 1 src/**/*.cjs dist/",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "ExternalPublications" ADD COLUMN "isVerified" BOOLEAN NOT NULL DEFAULT true;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "ExternalPublications" ADD COLUMN "verifiedAt" TIMESTAMP(3);
10 changes: 6 additions & 4 deletions desci-server/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -986,16 +986,18 @@ model PublishStatus {
}

model ExternalPublications {
id Int @id @default(autoincrement())
id Int @id @default(autoincrement())
uuid String
node Node @relation(fields: [uuid], references: [uuid])
node Node @relation(fields: [uuid], references: [uuid])
score Float
doi String
publisher String
publishYear String
sourceUrl String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
isVerified Boolean @default(true)
verifiedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// @@unique([uuid, publisher])
}
Expand Down
Loading

0 comments on commit 4447841

Please sign in to comment.