diff --git a/packages/core/src/events/m.room.power_levels.spec.ts b/packages/core/src/events/m.room.power_levels.spec.ts index bc311b44..7a613b74 100644 --- a/packages/core/src/events/m.room.power_levels.spec.ts +++ b/packages/core/src/events/m.room.power_levels.spec.ts @@ -63,8 +63,7 @@ test("roomPowerLevelsEvent", async () => { ], depth: 3, prev_events: ["$tZRt2bwceX4sG913Ee67tJiwe-gk859kY2mCeYSncw8"], - sender: "@admin:hs1", - member: "@asd6:rc1", + members: ["@admin:hs1", "@asd6:rc1"], ts: 1733107418713, }); diff --git a/packages/core/src/events/m.room.power_levels.ts b/packages/core/src/events/m.room.power_levels.ts index c3c9d2a8..96607c95 100644 --- a/packages/core/src/events/m.room.power_levels.ts +++ b/packages/core/src/events/m.room.power_levels.ts @@ -31,21 +31,20 @@ interface RoomPowerLevelsEvent extends EventBase { export const roomPowerLevelsEvent = ({ roomId, - sender, - member, + members: usernames, auth_events, prev_events, depth, ts = Date.now(), }: { roomId: string; - sender: string; - member: string; + members: [sender: string, ...member: string[]]; auth_events: string[]; prev_events: string[]; depth: number; ts?: number; }) => { + const [sender, ...members] = usernames; return createEventBase("m.room.power_levels", { roomId, sender, @@ -54,7 +53,10 @@ export const roomPowerLevelsEvent = ({ depth, ts, content: { - users: { [sender]: 100, [member]: 100 }, + users: { + [sender]: 100, + ...Object.fromEntries(usernames.map((member) => [member, 100])), + }, users_default: 0, events: { "m.room.name": 50, diff --git a/packages/fake/src/room.ts b/packages/fake/src/room.ts index 3281d650..7f206603 100644 --- a/packages/fake/src/room.ts +++ b/packages/fake/src/room.ts @@ -46,8 +46,7 @@ export const fakeEndpoints = new Elysia({ prefix: "/fake" }) } const { roomId, events } = await createRoom( - sender, - username, + [sender, username], createSignedEvent(config.signingKey[0], config.name), `!${createMediaId(18)}:${config.name}`, ); @@ -112,8 +111,7 @@ export const fakeEndpoints = new Elysia({ prefix: "/fake" }) } const { roomId: newRoomId, events } = await createRoom( - sender, - username, + [sender, username], createSignedEvent(config.signingKey[0], config.name), `!${createMediaId(18)}:${config.name}`, ); diff --git a/packages/homeserver/src/authentication.spec.ts b/packages/homeserver/src/authentication.spec.ts index 0b95ff0b..d7c7c387 100644 --- a/packages/homeserver/src/authentication.spec.ts +++ b/packages/homeserver/src/authentication.spec.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from "bun:test"; import { - computeHash, + computeAndMergeHash, extractSignaturesFromHeader, generateId, signRequest, @@ -142,7 +142,7 @@ test("signRequest", async () => { }); test("computeHash", async () => { - const result = computeHash({ + const result = computeAndMergeHash({ auth_events: [ "$e0YmwnKseuHqsuF50ekjta7z5UpO-bDoq7y4R1NKMpI", "$6_VX-xW821oaBwOuaaV_xoC6fD2iMg2QPWD4J7Bh3o4", diff --git a/packages/homeserver/src/authentication.ts b/packages/homeserver/src/authentication.ts index 00b7dc7a..65a42f29 100644 --- a/packages/homeserver/src/authentication.ts +++ b/packages/homeserver/src/authentication.ts @@ -132,7 +132,7 @@ export type HashedEvent> = T & { }; }; -export function computeHash>( +export function computeAndMergeHash>( content: T, ): HashedEvent { // remove the fields that are not part of the hash @@ -146,19 +146,39 @@ export function computeHash>( ...toHash } = content as any; + const [algorithm, hash] = computeHash(toHash); + return { ...content, hashes: { - sha256: toUnpaddedBase64( - crypto - .createHash("sha256") - .update(encodeCanonicalJson(toHash)) - .digest(), - ), + [algorithm]: hash, }, }; } +export function computeHash>( + content: T, + algorithm: "sha256" = "sha256", +): ["sha256", string] { + // remove the fields that are not part of the hash + const { + age_ts, + unsigned, + signatures, + hashes, + outlier, + destinations, + ...toHash + } = content as any; + + return [ + algorithm, + toUnpaddedBase64( + crypto.createHash(algorithm).update(encodeCanonicalJson(toHash)).digest(), + ), + ]; +} + export function generateId(content: T): string { // remove the fields that are not part of the hash const { age_ts, unsigned, signatures, ...toHash } = pruneEventDict( diff --git a/packages/homeserver/src/fixtures/ContextBuilder.ts b/packages/homeserver/src/fixtures/ContextBuilder.ts index c998e197..743dd6c4 100644 --- a/packages/homeserver/src/fixtures/ContextBuilder.ts +++ b/packages/homeserver/src/fixtures/ContextBuilder.ts @@ -1,4 +1,6 @@ import Elysia from "elysia"; +import Crypto from "node:crypto"; + import { type SigningKey, generateKeyPairsFromString } from "../keys"; import { toUnpaddedBase64 } from "../binaryData"; import type { @@ -9,6 +11,8 @@ import { authorizationHeaders, generateId } from "../authentication"; import type { HomeServerRoutes } from "../app"; import type { EventBase } from "@hs/core/src/events/eventBase"; import type { EventStore } from "../plugins/mongodb"; +import { createRoom } from "../procedures/createRoom"; +import { createSignedEvent } from "@hs/core/src/events/utils/createSignedEvent"; type MockedFakeRequest = < M extends HomeServerRoutes["method"], @@ -19,6 +23,28 @@ type MockedFakeRequest = < body: getAllResponsesByPath["body"], ) => Promise; +export function createMediaId(length: number) { + const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + let result = ""; + for (let i = 0; i < length; i++) { + const randomIndex = Crypto.randomInt(0, characters.length); + result += characters[randomIndex]; + } + return result; +} + +class MockedRoom { + private events: EventStore[] = []; + constructor( + private roomId: string, + events: EventStore[], + ) { + for (const event of events) { + this.events.push(event); + } + } +} + export class ContextBuilder { private config: any; private mongo: any; @@ -86,6 +112,7 @@ export class ContextBuilder { app: Elysia; instance: ContextBuilder; makeRequest: MockedFakeRequest; + createRoom: (sender: string, ...members: string[]) => Promise; }> { const signature = await generateKeyPairsFromString(this.signingSeed); @@ -155,12 +182,25 @@ export class ContextBuilder { body: body && JSON.stringify(body), }); }; + return { signature, name: this.name, app, instance: this, makeRequest, + createRoom: async (sender: string, ...members: string[]) => { + const { roomId, events } = await createRoom( + [sender, ...members], + createSignedEvent(config.signingKey[0], config.name), + `!${createMediaId(18)}:${config.name}`, + ); + + for (const { event } of events) { + this.withEvent(roomId, event); + } + return new MockedRoom(roomId, events); + }, }; } } diff --git a/packages/homeserver/src/makeRequest.ts b/packages/homeserver/src/makeRequest.ts index ab0b2c6c..b3daa56a 100644 --- a/packages/homeserver/src/makeRequest.ts +++ b/packages/homeserver/src/makeRequest.ts @@ -1,5 +1,5 @@ import type { HomeServerRoutes } from "./app"; -import { authorizationHeaders, computeHash } from "./authentication"; +import { authorizationHeaders, computeAndMergeHash } from "./authentication"; import { resolveHostAddressByServerName } from "./helpers/server-discovery/discovery"; import { extractURIfromURL } from "./helpers/url"; import type { SigningKey } from "./keys"; @@ -50,7 +50,7 @@ export const makeSignedRequest = async < const signedBody = body && (await signJson( - computeHash({ ...body, signatures: {} }), + computeAndMergeHash({ ...body, signatures: {} }), signingKey, signingName, )); diff --git a/packages/homeserver/src/mutex/Mutex.ts b/packages/homeserver/src/mutex/Mutex.ts index c1b7222f..78851cc4 100644 --- a/packages/homeserver/src/mutex/Mutex.ts +++ b/packages/homeserver/src/mutex/Mutex.ts @@ -1,7 +1,14 @@ export class Mutex { private map: Map = new Map(); - public async request(scope: string) { + + public async request(scope: string, fail: true): Promise; + + public async request(scope: string): Promise; + public async request(scope: string, fail?: true) { if (this.map.has(scope)) { + if (fail) { + throw new Error("Mutex already locked"); + } return false; } @@ -20,4 +27,8 @@ export class Lock { public async release() { this.unlock(); } + + [Symbol.dispose]() { + this.release(); + } } diff --git a/packages/homeserver/src/plugins/mongodb.ts b/packages/homeserver/src/plugins/mongodb.ts index 220d2098..0248eaa1 100644 --- a/packages/homeserver/src/plugins/mongodb.ts +++ b/packages/homeserver/src/plugins/mongodb.ts @@ -23,12 +23,18 @@ export interface Server { // }[]; } +interface Room { + _id: string; + state: EventBase[]; +} + export const routerWithMongodb = (db: Db) => new Elysia().decorate( "mongo", (() => { const eventsCollection = db.collection("events"); const serversCollection = db.collection("servers"); + const roomsCollection = db.collection("rooms"); const getLastEvent = async (roomId: string) => { return eventsCollection.findOne( @@ -37,6 +43,24 @@ export const routerWithMongodb = (db: Db) => ); }; + const upsertRoom = async (roomId: string, state: EventBase[]) => { + await roomsCollection.findOneAndUpdate( + { _id: roomId }, + { + $set: { + _id: roomId, + state, + }, + }, + { upsert: true }, + ); + }; + + const getEventsByIds = async (roomId: string, eventIds: string[]) => { + return eventsCollection + .find({ "event.room_id": roomId, "event._id": { $in: eventIds } }) + .toArray(); + }; const getDeepEarliestAndLatestEvents = async ( roomId: string, earliest_events: string[], @@ -165,6 +189,30 @@ export const routerWithMongodb = (db: Db) => return id; }; + const createEvent = async (event: EventBase) => { + const id = generateId(event); + await eventsCollection.insertOne({ + _id: id, + event, + }); + + return id; + }; + + const removeEventFromStaged = async (roomId: string, id: string) => { + await eventsCollection.updateOne( + { _id: id, "event.room_id": roomId }, + { $unset: { staged: 1 } }, + ); + }; + + const getOldestStagedEvent = async (roomId: string) => { + return eventsCollection.findOne( + { staged: true, "event.room_id": roomId }, + { sort: { "event.origin_server_ts": 1 } }, + ); + }; + return { serversCollection, getValidPublicKeyFromLocal, @@ -175,7 +223,13 @@ export const routerWithMongodb = (db: Db) => getMissingEventsByDeep, getLastEvent, getAuthEvents, + + removeEventFromStaged, + getEventsByIds, + getOldestStagedEvent, createStagingEvent, + createEvent, + upsertRoom, }; })(), ); diff --git a/packages/homeserver/src/procedures/createRoom.spec.ts b/packages/homeserver/src/procedures/createRoom.spec.ts index c8e12b3c..c22196d4 100644 --- a/packages/homeserver/src/procedures/createRoom.spec.ts +++ b/packages/homeserver/src/procedures/createRoom.spec.ts @@ -11,8 +11,7 @@ test("createRoom", async () => { const makeSignedEvent = createSignedEvent(signature, "hs1"); const { roomId, events } = await createRoom( - "@sender:hs1", - "username", + ["@sender:hs1", "@username:hs1"], makeSignedEvent, "!roomId:hs1", ); diff --git a/packages/homeserver/src/procedures/createRoom.ts b/packages/homeserver/src/procedures/createRoom.ts index e1809ec5..bd6133dc 100644 --- a/packages/homeserver/src/procedures/createRoom.ts +++ b/packages/homeserver/src/procedures/createRoom.ts @@ -10,8 +10,7 @@ import { createRoomHistoryVisibilityEvent } from "@hs/core/src/events/m.room.his import { createRoomGuestAccessEvent } from "@hs/core/src/events/m.room.guest_access"; export const createRoom = async ( - sender: string, - username: string, + users: [sender: string, ...username: string[]], makeSignedEvent: ReturnType, roomId: string, ): Promise<{ @@ -23,6 +22,8 @@ export const createRoom = async ( }> => { // Create + const [sender, ...members] = users; + const createRoomSigned = createRoomCreateEvent(makeSignedEvent); const createMemberRoomSigned = createRoomMemberEvent(makeSignedEvent); @@ -62,8 +63,7 @@ export const createRoom = async ( const powerLevelsEvent = await createPowerLevelsRoomSigned({ roomId, - sender, - member: username, + members: [sender, ...members], auth_events: [createEvent._id, memberEvent._id], prev_events: [memberEvent._id], depth: 3, diff --git a/packages/homeserver/src/procedures/getPublicKeyFromServer.ts b/packages/homeserver/src/procedures/getPublicKeyFromServer.ts index ded92745..e6b0c616 100644 --- a/packages/homeserver/src/procedures/getPublicKeyFromServer.ts +++ b/packages/homeserver/src/procedures/getPublicKeyFromServer.ts @@ -9,6 +9,7 @@ export const makeGetPublicKeyFromServerProcedure = ( getFromLocal: (origin: string, key: string) => Promise, getFromOrigin: ( origin: string, + key: string, ) => Promise<{ key: string; validUntil: number }>, store: ( origin: string, @@ -23,7 +24,10 @@ export const makeGetPublicKeyFromServerProcedure = ( return localPublicKey; } - const { key: remotePublicKey, validUntil } = await getFromOrigin(origin); + const { key: remotePublicKey, validUntil } = await getFromOrigin( + origin, + key, + ); if (remotePublicKey) { await store(origin, key, remotePublicKey, validUntil); return remotePublicKey; @@ -35,14 +39,14 @@ export const makeGetPublicKeyFromServerProcedure = ( export const getPublicKeyFromRemoteServer = async ( domain: string, - signingName: string, + origin: string, algorithmAndVersion: string, ) => { const result = await makeRequest({ method: "GET", domain, uri: "/_matrix/key/v2/server", - signingName, + signingName: origin, }); if (result.valid_until_ts < Date.now()) { throw new Error("Expired remote public key"); diff --git a/packages/homeserver/src/procedures/processPDU.ts b/packages/homeserver/src/procedures/processPDU.ts new file mode 100644 index 00000000..ba5fcdad --- /dev/null +++ b/packages/homeserver/src/procedures/processPDU.ts @@ -0,0 +1,47 @@ +import type { EventBase } from "@hs/core/src/events/eventBase"; +import type { SignedJson } from "../signJson"; +import type { HashedEvent } from "../authentication"; +import type { EventStore } from "../plugins/mongodb"; + +export const processPDUsByRoomId = async ( + roomId: string, + pdus: SignedJson>[], + validatePdu: (pdu: SignedJson>) => Promise, + getEventsByIds: (roomId: string, eventIds: string[]) => Promise, + createStagingEvent: (event: EventBase) => Promise, + createEvent: (event: EventBase) => Promise, + processMissingEvents: (roomId: string) => Promise, + generateId: (pdu: SignedJson>) => string, +) => { + const resultPDUs = {} as { + [key: string]: Record; + }; + for (const pdu of pdus) { + const pid = generateId(pdu); + try { + await validatePdu(pdu); + resultPDUs[pid] = {}; + + const events = await getEventsByIds(roomId, pdu.prev_events); + + const missing = pdu.prev_events.filter( + (event) => !events.find((e) => e._id === event), + ); + + if (!missing.length) { + await createStagingEvent(pdu); + } else { + await createEvent(pdu); + } + } catch (error) { + resultPDUs[pid] = { error } as any; + } + void (async () => { + while (await processMissingEvents(roomId)); + })(); + } + + return { + pdus: resultPDUs, + }; +}; diff --git a/packages/homeserver/src/routes/federation/checkSignAndHashes.ts b/packages/homeserver/src/routes/federation/checkSignAndHashes.ts new file mode 100644 index 00000000..228da005 --- /dev/null +++ b/packages/homeserver/src/routes/federation/checkSignAndHashes.ts @@ -0,0 +1,47 @@ +import { computeHash, type HashedEvent } from "../../authentication"; +import type { EventBase } from "@hs/core/src/events/eventBase"; +import { + getSignaturesFromRemote, + verifyJsonSignature, + type SignedJson, +} from "../../signJson"; +import { pruneEventDict } from "../../pruneEventDict"; +import { MatrixError } from "../../errors"; + +export async function checkSignAndHashes( + pdu: SignedJson>, + origin: string, + getPublicKeyFromServer: (origin: string, key: string) => Promise, +) { + const { hashes, ...rest } = pdu; + + const [signature] = await getSignaturesFromRemote(pdu, origin); + + const publicKey = await getPublicKeyFromServer( + origin, + `${signature.algorithm}:${signature.version}`, + ); + + if ( + !verifyJsonSignature( + pruneEventDict(pdu), + origin, + Uint8Array.from(atob(signature.signature), (c) => c.charCodeAt(0)), + Uint8Array.from(atob(publicKey), (c) => c.charCodeAt(0)), + signature.algorithm, + signature.version, + ) + ) { + throw new MatrixError("400", "Invalid signature"); + } + + const [algorithm, hash] = computeHash(pdu); + + const expectedHash = pdu.hashes[algorithm]; + + if (hash !== expectedHash) { + throw new MatrixError("400", "Invalid hash"); + } + + return true; +} diff --git a/packages/homeserver/src/routes/federation/sendInviteV2.ts b/packages/homeserver/src/routes/federation/sendInviteV2.ts index d1357f40..b901b86e 100644 --- a/packages/homeserver/src/routes/federation/sendInviteV2.ts +++ b/packages/homeserver/src/routes/federation/sendInviteV2.ts @@ -4,9 +4,18 @@ import { InviteEventDTO } from "../../dto"; import { StrippedStateDTO } from "../../dto"; import { ErrorDTO } from "../../dto"; import { makeSignedRequest } from "../../makeRequest"; -import { generateId } from "../../authentication"; +import { type HashedEvent, generateId } from "../../authentication"; import { isMongodbContext } from "../../plugins/isMongodbContext"; import { isConfigContext } from "../../plugins/isConfigContext"; +import { MatrixError } from "../../errors"; + +import type { SignedJson } from "../../signJson"; +import type { EventBase } from "@hs/core/src/events/eventBase"; +import { + getPublicKeyFromRemoteServer, + makeGetPublicKeyFromServerProcedure, +} from "../../procedures/getPublicKeyFromServer"; +import { checkSignAndHashes } from "./checkSignAndHashes"; export const sendInviteV2Route = new Elysia().put( "/invite/:roomId/:eventId", @@ -19,7 +28,7 @@ export const sendInviteV2Route = new Elysia().put( } const { config, - mongo: { eventsCollection }, + mongo: { eventsCollection, upsertRoom }, } = context; console.log("invite received ->", { params, body }); @@ -73,17 +82,91 @@ export const sendInviteV2Route = new Elysia().put( console.log("send_join response ->", { responseBody }); - if (responseBody.event) { + const { event: pdu, origin } = responseBody; + + const createEvent = responseBody.state.find( + (event) => event.type === "m.room.create", + ); + + if (!createEvent) { + throw new MatrixError("400", "Invalid response"); + } + + if (pdu) { await eventsCollection.insertOne({ _id: generateId(responseBody.event), event: responseBody.event, }); } + + const auth_chain = new Map( + responseBody.auth_chain.map((event) => [generateId(event), event]), + ); + + const state = new Map( + responseBody.state.map((event) => [generateId(event), event]), + ); + + const getPublicKeyFromServer = makeGetPublicKeyFromServerProcedure( + context.mongo.getValidPublicKeyFromLocal, + (origin, key) => getPublicKeyFromRemoteServer(origin, config.name, key), + + context.mongo.storePublicKey, + ); + + const validPDUs = new Map(); + + for await (const [eventId, event] of [ + ...auth_chain.entries(), + ...state.entries(), + ]) { + // check sign and hash of event + if ( + await checkSignAndHashes( + event as SignedJson>, + event.origin, + getPublicKeyFromServer, + ).catch((e) => { + console.log("Error checking signature", e); + return false; + }) + ) { + validPDUs.set(eventId, event); + } else { + console.log("Invalid event", event); + } + } + + const signedAuthChain = [...auth_chain.entries()].filter(([eventId]) => + validPDUs.has(eventId), + ); + + const signedState = [...state.entries()].filter(([eventId]) => + validPDUs.has(eventId), + ); + + const signedCreateEvent = signedAuthChain.find( + ([, event]) => event.type === "m.room.create", + ); + + if (!signedCreateEvent) { + console.log("Invalid create event", validPDUs); + throw new MatrixError( + "400", + "Unexpected create event(s) in auth chain", + ); + } + + await upsertRoom( + signedCreateEvent[1].room_id, + signedState.map(([, event]) => event), + ); + await Promise.all( - responseBody.state?.map((event) => { + signedState.map(([eventId, event]) => { const promise = eventsCollection .insertOne({ - _id: generateId(event), + _id: eventId, event, }) .catch((e) => { diff --git a/packages/homeserver/src/routes/federation/sendJoinV2.ts b/packages/homeserver/src/routes/federation/sendJoinV2.ts index 1507ba19..be8f0786 100644 --- a/packages/homeserver/src/routes/federation/sendJoinV2.ts +++ b/packages/homeserver/src/routes/federation/sendJoinV2.ts @@ -3,6 +3,9 @@ import { Elysia, t } from "elysia"; import { isConfigContext } from "../../plugins/isConfigContext"; import { isMongodbContext } from "../../plugins/isMongodbContext"; import { isRoomMemberEvent } from "@hs/core/src/events/m.room.member"; +import type { SignedEvent } from "../../signJson"; +import type { EventBase } from "@hs/core/src/events/eventBase"; +import type { HashedEvent } from "../../authentication"; // PUT uri: `/_matrix/federation/v1/send_join/${params.roomId}/${event.state_key}?omit_members=true`, @@ -22,7 +25,7 @@ export const sendJoinV2Route = new Elysia().put( const roomId = decodeURIComponent(params.roomId); const stateKey = decodeURIComponent(params.stateKey); - const event = body as any; + const event = body as SignedEvent>; console.log("sendJoin ->", { roomId, stateKey }); console.log("sendJoin ->", { body }); @@ -51,13 +54,13 @@ export const sendJoinV2Route = new Elysia().put( prev_content: lastInviteEvent.event.content, prev_sender: lastInviteEvent.event.sender, }, - }, + } as SignedEvent>, state: events, auth_chain: events.filter((event) => event.depth <= 4), // auth_chain: [], members_omitted: false, origin: config.name, - }; + } as const; console.log("sendJoin result ->", result); diff --git a/packages/homeserver/src/routes/federation/sendTransaction.spec.ts b/packages/homeserver/src/routes/federation/sendTransaction.spec.ts index 0cc68c09..1be09cd8 100644 --- a/packages/homeserver/src/routes/federation/sendTransaction.spec.ts +++ b/packages/homeserver/src/routes/federation/sendTransaction.spec.ts @@ -1,112 +1,82 @@ import { beforeAll, describe, expect, it } from "bun:test"; -import Elysia from "elysia"; +import type Elysia from "elysia"; import { type SigningKey, generateKeyPairsFromString } from "../../keys"; import { toUnpaddedBase64 } from "../../binaryData"; import { sendTransactionRoute } from "./sendTransaction"; import { signJson } from "../../signJson"; import { signEvent } from "../../signEvent"; import { authorizationHeaders, generateId } from "../../authentication"; +import { type ContextBuilder, hs1, rc1 } from "../../fixtures/ContextBuilder"; +import { pruneEventDict } from "../../pruneEventDict"; describe("/send/:txnId", () => { describe("PDU validation", () => { let app: Elysia; let signature: SigningKey; + let hs1Content: Awaited>; + let rc1Content: Awaited>; + beforeAll(async () => { - signature = await generateKeyPairsFromString( - "ed25519 a_yNbw tBD7FfjyBHgT4TwhwzvyS9Dq2Z9ck38RRQKaZ6Sz2z8", + hs1Content = await hs1.build(); + rc1.withLocalSigningKey("hs1", hs1Content.signature); + rc1Content = await rc1.build(); + hs1.withLocalSigningKey("rc1", rc1Content.signature); + hs1Content = await hs1.build(); + + signature = rc1Content.signature; + app = rc1Content.app.group("/_matrix/federation/v1", (app) => + app.use(sendTransactionRoute), ); - - app = new Elysia() - .decorate("config", { - path: "./config.json", - signingKeyPath: "./keys/ed25519.signing.key", - port: 8080, - signingKey: [ - await generateKeyPairsFromString( - "ed25519 a_XRhW YjbSyfqQeGto+OFswt+XwtJUUooHXH5w+czSgawN63U", - ), - ], - name: "synapse2", - version: "org.matrix.msc3757.10", - }) - .decorate("mongo", { - getValidPublicKeyFromLocal: async () => { - return toUnpaddedBase64(signature.publicKey); - }, - storePublicKey: async () => { - return; - }, - eventsCollection: { - findOne: async () => { - return; - }, - findOneAndUpdate: async () => { - return; - }, - insertMany: async () => { - return; - }, - }, - createStagingEvent: async () => { - return; - }, - serversCollection: { - findOne: async () => { - return; - }, - } as any, - }) - .use(sendTransactionRoute); }); it("Should reject if there is more than 100 edus", async () => { + const transactionid = "asd"; + const resp = await app.handle( - new Request("https://localhost/send/txnId", { - headers: { - authorization: "Bearer invalid", - "content-type": "application/json", - }, - method: "PUT", - body: JSON.stringify({ - edus: Array.from({ length: 101 }, (_, i) => ({ - content: { - membership: "join", - avatar_url: null, - displayname: "rodrigo2", - }, - origin: "synapse2", - origin_server_ts: 1664987618773, - sender: "@rodrigo2:synapse2", - unsigned: { - age: 2, - }, - })), - }), + await hs1Content.makeRequest< + "PUT", + `/_matrix/federation/v1/send/${string}` + >("PUT", `/_matrix/federation/v1/send/${transactionid}`, { + edus: Array.from({ length: 101 }, (_, i) => ({ + content: { + membership: "join", + avatar_url: null, + displayname: "rodrigo2", + }, + origin: "hs1", + origin_server_ts: 1664987618773, + sender: "@rodrigo2:hs1", + unsigned: { + age: 2, + }, + })), }), ); - const data = await resp.json(); expect(resp.status).toBe(400); }); it("Should pass if there a proper pdu is provided", async () => { - const signature = await generateKeyPairsFromString( - "ed25519 a_yNbw tBD7FfjyBHgT4TwhwzvyS9Dq2Z9ck38RRQKaZ6Sz2z8", + rc1Content = await rc1.build(); + app = rc1Content.app.group("/_matrix/federation/v1", (app) => + app.use(sendTransactionRoute), ); + const transactionid = "asd"; + const pdu = { - event_id: "1664987618773:synapse2", - room_id: "!room:synapse2", + event_id: "1664987618773:hs1", + room_id: "!room:hs1", type: "m.room.member", content: { membership: "join", avatar_url: null, displayname: "rodrigo2", }, - origin: "synapse2", + origin: "hs1", origin_server_ts: 1664987618773, - sender: "@rodrigo2:synapse2", + sender: "@rodrigo2:hs1", unsigned: { age: 2, }, @@ -119,18 +89,14 @@ describe("/send/:txnId", () => { depth: 12, }; - const signedPdu = await signEvent(pdu, signature, "synapse2"); + const signedPdu = await signEvent(pdu, hs1Content.signature, "hs1"); const resp = await app.handle( - new Request("https://localhost/send/txnId", { - headers: { - authorization: "Bearer invalid", - "content-type": "application/json", - }, - method: "PUT", - body: JSON.stringify({ - pdus: [signedPdu], - }), + await hs1Content.makeRequest< + "PUT", + `/_matrix/federation/v1/send/${string}` + >("PUT", `/_matrix/federation/v1/send/${transactionid}`, { + pdus: [signedPdu], }), ); @@ -140,46 +106,52 @@ describe("/send/:txnId", () => { expect(resp.status).toBe(200); expect(data).toHaveProperty("pdus"); expect(data.pdus).toStrictEqual({ - [id]: {}, + pdus: { + [id]: {}, + }, }); }); it("Should reject if the pdu is invalid", async () => { - const signature = await generateKeyPairsFromString( - "ed25519 a_yNbw tBD7FfjyBHgT4TwhwzvyS9Dq2Z9ck38RRQKaZ6Sz2z8", - ); - const pdu = { - event_id: "1664987618773:synapse2", - room_id: "!room:synapse2", + room_id: "!room:hs1", type: "m.room.member", content: { membership: "join", avatar_url: null, displayname: "rodrigo2", }, - origin: "synapse2", + auth_events: [ + "$A1NdD_Lf1IvcHeg0-pkApLWpKbputIaZ_Z4yIHK5YDg", + "$dOOm8jYy4ioI77w2AbySU1NavHhU7US4Lukm76aOf5w", + "$BLMgX0J7Gd4JZZzTsprQjJWtEfGlccgPUYC7XQyg2ds", + ], + prev_events: ["$js6Vn-9W65pkvfigwsod3xqyvA7pRqDOKOcCJ69AxVs"], + origin: "hs1", origin_server_ts: 1664987618773, - sender: "@rodrigo2:synapse2", + sender: "@rodrigo2:hs1", unsigned: { age: 2, }, + depth: 12, }; + const transactionid = "asd"; - const signedPdu = await signJson(pdu, signature, "synapse2"); + const signedPdu = await signJson( + pruneEventDict(pdu), + hs1Content.signature, + "hs1", + ); - signedPdu.content.membership = "invalid"; + signedPdu.content!.membership = "invalid"; const resp = await app.handle( - new Request("https://localhost/send/txnId", { - headers: { - authorization: "Bearer invalid", - "content-type": "application/json", - }, - method: "PUT", - body: JSON.stringify({ - pdus: [signedPdu], - }), + await hs1Content.makeRequest< + "PUT", + `/_matrix/federation/v1/send/${string}` + >("PUT", `/_matrix/federation/v1/send/${transactionid}`, { + pdus: [signedPdu], + edus: [], }), ); @@ -188,7 +160,11 @@ describe("/send/:txnId", () => { expect(resp.status).toBe(200); expect(data).toHaveProperty("pdus"); expect(data.pdus).toStrictEqual({ - [id]: {}, + pdus: { + [id]: { + error: {}, + }, + }, }); }); }); @@ -197,61 +173,22 @@ describe("/send/:txnId", () => { describe("/send/:txnId using real case", () => { describe("PDU validation", () => { let app: Elysia; + + let hs1Content: Awaited>; let signature: SigningKey; beforeAll(async () => { - signature = await generateKeyPairsFromString( - "ed25519 a_HDhg WntaJ4JP5WbZZjDShjeuwqCybQ5huaZAiowji7tnIEw", - ); + hs1Content = await hs1.build(); + rc1.withLocalSigningKey("hs1", hs1Content.signature); + const rc1Content = await rc1.build(); - app = new Elysia() - .decorate("config", { - path: "./config.json", - signingKeyPath: "./keys/ed25519.signing.key", - port: 8080, - signingKey: [ - await generateKeyPairsFromString( - "ed25519 a_XRhW YjbSyfqQeGto+OFswt+XwtJUUooHXH5w+czSgawN63U", - ), - ], - name: "synapse2", - version: "org.matrix.msc3757.10", - }) - .decorate("mongo", { - getValidPublicKeyFromLocal: async () => { - return toUnpaddedBase64(signature.publicKey); - }, - storePublicKey: async () => { - return; - }, - eventsCollection: { - findOne: async () => { - return; - }, - findOneAndUpdate: async () => { - return; - }, - insertMany: async () => { - return; - }, - }, - createStagingEvent: async () => { - return; - }, - serversCollection: { - findOne: async () => { - return; - }, - } as any, - }) - .use(sendTransactionRoute); + signature = rc1Content.signature; + app = rc1Content.app.group("/_matrix/federation/v1", (app) => + app.use(sendTransactionRoute), + ); }); it("real case", async () => { - const signature = await generateKeyPairsFromString( - "ed25519 a_HDhg tBD7FfjyBHgT4TwhwzvyS9Dq2Z9ck38RRQKaZ6Sz2z8", - ); - const request = { origin: "hs1", origin_server_ts: 1734360416888, @@ -292,7 +229,7 @@ describe("/send/:txnId using real case", () => { ], }; const resp = await app.handle( - new Request("https://localhost/send/txnId", { + new Request("https://localhost/_matrix/federation/v1/send/txnId", { headers: { authorization: "Bearer invalid", "content-type": "application/json", @@ -307,7 +244,9 @@ describe("/send/:txnId using real case", () => { expect(resp.status).toBe(200); expect(data).toHaveProperty("pdus"); expect(data.pdus).toStrictEqual({ - [`${id}`]: {}, + pdus: { + [`${id}`]: {}, + }, }); }); }); diff --git a/packages/homeserver/src/routes/federation/sendTransaction.ts b/packages/homeserver/src/routes/federation/sendTransaction.ts index 408775e6..11604519 100644 --- a/packages/homeserver/src/routes/federation/sendTransaction.ts +++ b/packages/homeserver/src/routes/federation/sendTransaction.ts @@ -17,20 +17,32 @@ import { import { isConfigContext } from "../../plugins/isConfigContext"; import { MatrixError } from "../../errors"; import { isRoomMemberEvent } from "@hs/core/src/events/m.room.member"; +import { makeRequest } from "../../makeRequest"; +import { isMutexContext, routerWithMutex } from "../../plugins/mutex"; +import { processPDUsByRoomId } from "../../procedures/processPDU"; -export const sendTransactionRoute = new Elysia().put( - "/send/:txnId", - async ({ params, body, ...context }) => { +export const sendTransactionRoute = new Elysia() + .use(routerWithMutex) + .put("/send/:txnId", async ({ params, body, ...context }) => { if (!isConfigContext(context)) { throw new Error("No config context"); } if (!isMongodbContext(context)) { throw new Error("No mongodb context"); } + if (!isMutexContext(context)) { + throw new Error("No mutex context"); + } const { config, - mongo: { eventsCollection, createStagingEvent }, + mongo: { + getEventsByIds, + createStagingEvent, + createEvent, + removeEventFromStaged, + getOldestStagedEvent, + }, } = context; const { pdus, edus = [] } = body as any; @@ -39,7 +51,6 @@ export const sendTransactionRoute = new Elysia().put( throw new MatrixError("400", "Too many edus"); } - console.log("1"); const isValidPDU = ( pdu: any, ): pdu is SignedJson> => { @@ -160,27 +171,81 @@ export const sendTransactionRoute = new Elysia().put( } }; - const resultPDUs = {} as { - [key: string]: Record; + /** + * Based on the fetched events from the remote server, we check if there are any new events (that haven't been stored yet) + * @param fetchedEvents + * @returns + */ + + const getNewEvents = async ( + roomId: string, + fetchedEvents: EventBase[], + ) => { + const fetchedEventsIds = fetchedEvents.map(generateId); + const storedEvents = await getEventsByIds(roomId, fetchedEventsIds); + return fetchedEvents + .filter( + (event) => !storedEvents.find((e) => e._id === generateId(event)), + ) + .sort((a, b) => a.depth - b.depth); }; - for (const [roomId, pdus] of pdusByRoomId) { - // const roomVersion = getRoomVersion - for (const pdu of pdus) { - try { - await validatePdu(pdu); - resultPDUs[`${generateId(pdu)}`] = {}; - void createStagingEvent(pdu); - } catch (e) { - console.error("error validating pdu", e); - resultPDUs[`${generateId(pdu)}`] = e as any; - } + const processMissingEvents = async (roomId: string) => { + const lock = await context.mutex.request(roomId); + if (!lock) { + return false; + } + const event = await getOldestStagedEvent(roomId); + + if (!event) { + return false; } - } - return { - pdus: resultPDUs, + const { _id: pid, event: pdu } = event; + + const fetchedEvents = await makeRequest({ + method: "POST", + domain: pdu.origin, + uri: `/_matrix/federation/v1/get_missing_events/${pdu.room_id}`, + body: { + earliest_events: pdu.prev_events, + latest_events: [pid], + limit: 10, + min_depth: 10, + }, + signingName: config.name, + }); + + const newEvents = await getNewEvents(roomId, fetchedEvents.events); + // in theory, we have all the new events + await removeEventFromStaged(roomId, pid); + + for await (const event of newEvents) { + await createStagingEvent(event); + } + + await lock.release(); + + return true; }; + const result = { + pdus: {}, + }; + for await (const [roomId, pdus] of pdusByRoomId) { + Object.assign( + result.pdus, + await processPDUsByRoomId( + roomId, + pdus, + validatePdu, + getEventsByIds, + createStagingEvent, + createEvent, + processMissingEvents, + generateId, + ), + ); + } + return result; } - }, -); + }); diff --git a/packages/homeserver/src/signEvent.ts b/packages/homeserver/src/signEvent.ts index acd97108..f311d0d5 100644 --- a/packages/homeserver/src/signEvent.ts +++ b/packages/homeserver/src/signEvent.ts @@ -1,4 +1,4 @@ -import { computeHash } from "./authentication"; +import { computeAndMergeHash } from "./authentication"; import type { EventBase } from "@hs/core/src/events/eventBase"; import type { SigningKey } from "./keys"; import { pruneEventDict } from "./pruneEventDict"; @@ -21,7 +21,7 @@ export const signEvent = async ( signingName: string, ): Promise> => { const s = await signJson( - pruneEventDict(computeHash(event)), + pruneEventDict(computeAndMergeHash(event)), signature, signingName, );