Skip to content

Commit

Permalink
[#69027] signal: wait until document is loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
MaciejWas committed Nov 20, 2024
1 parent 31ab8ed commit 57e77c4
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions bin/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const wsReadyStateClosed = 3; // eslint-disable-line
const gcEnabled = process.env.GC !== "false" && process.env.GC !== "0";
const persistenceDir = process.env.YPERSISTENCE;
/**
* @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise<any>, provider: any}|null}
* @type {{bindState: function(string,WSSharedDoc):Promise<void>, writeState:function(string,WSSharedDoc):Promise<any>, provider: any}|null}
*/
let persistence = null;
if (typeof persistenceDir === "string") {
Expand Down Expand Up @@ -150,26 +150,45 @@ class WSSharedDoc extends Y.Doc {
}
}

const getYDocLock = new Map();

const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
/**
* Gets a Y.Doc by name, whether in memory or on disk
*
* @param {string} docname - the name of the Y.Doc to find or create
* @param {boolean} gc - whether to allow gc on the doc (applies only when created)
* @return {WSSharedDoc}
* @return {Promise<WSSharedDoc>}
*/
export const getYDoc = (docname, gc = true) =>
map.setIfUndefined(docs, docname, () => {
export const getYDoc = async (docname, gc = true, attempt = 0) => {
if (attempt > 5) return null;

if (getYDocLock.get(docname)) {
await sleep(2000);
return getYDoc(docname, gc, attempt + 1);
}
try {
getYDocLock.set(docname, true);
var existingDoc = docs.get(docname);
if (existingDoc) return existingDoc;

const doc = new WSSharedDoc(docname);
doc.gc = gc;
if (persistence !== null) {
logAsync(docname, { event: "connection-setup", msg: "Loading persistent document '" + docname + "'" });
persistence.bindState(docname, doc);
await logAsync(docname, { event: "connection-setup", msg: "Initializing persistent document '" + docname + "'" });
await persistence.bindState(docname, doc);
await logAsync(docname, { event: "connection-setup", msg: "Document '" + docname + "' was loaded" });
} else {
logAsync(docname, { event: "connection-setup", msg: "Persistence is disabled: creating a new document '" + docname + "'" });
}
logChanges(doc, docname);
docs.set(docname, doc);

return doc;
});
} finally {
getYDocLock.set(docname, false);
}
};

/** @param {Uint8Array} msg */
const tryDecode = (msg) => {
Expand Down Expand Up @@ -275,16 +294,24 @@ const pingTimeout = 30000;
* @param {any} req
* @param {any} opts
*/
export const setupWSConnection = (conn, req, { docName = req.url.slice(1).split("?")[0], gc = true } = {}) => {
export const setupWSConnection = async (conn, req, { docName = req.url.slice(1).split("?")[0], gc = true } = {}) => {
conn.__connectionId = Math.floor(Math.random() * 100000);
conn.binaryType = "arraybuffer";
logAsync(docName, { event: "connection-setup", msg: "New connection", connectionId: conn.__connectionId });

// get doc, initialize if it does not exist yet
const doc = getYDoc(docName, gc);
const docPromise = getYDoc(docName, gc);

conn.on("message", /** @param {ArrayBuffer} message */ (message) => docPromise.then((doc) => messageListener(conn, doc, new Uint8Array(message))));

const doc = await docPromise;
if (!doc) {
conn.close();
return;
}

doc.conns.set(conn, new Set());
// listen and reply to events
conn.on("message", /** @param {ArrayBuffer} message */ (message) => messageListener(conn, doc, new Uint8Array(message)));

// Check if connection is still alive
let pongReceived = true;
Expand Down

0 comments on commit 57e77c4

Please sign in to comment.