diff --git a/index.js b/index.js index 5a331d3..e8ddf56 100644 --- a/index.js +++ b/index.js @@ -74,6 +74,10 @@ function ShareDbMongo(mongo, options) { } this._middleware = new MiddlewareHandler(); + + this._sessions = Object.create(null); + this._transactionOpLinks = Object.create(null); + this._lockedCollections = Object.create(null); }; ShareDbMongo.prototype = Object.create(DB.prototype); @@ -203,12 +207,54 @@ ShareDbMongo.prototype.close = function(callback) { // **** Commit methods ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, options, callback) { + options = options || {}; var self = this; var request = createRequestForMiddleware(options, collectionName, op); - this._writeOp(collectionName, id, op, snapshot, function(err, result) { + + var cb = callback; + callback = function(error, succeeded) { + if (error && error.code === ShareDbMongo.sessionNotStartedError().code) { + // Session starting is handled automatically by ShareDB, so if a session hasn't + // started, it actually means that a session was closed early, possibly to avoid + // a transaction deadlock. In this case, swallow the error and tell ShareDB that + // the commit hasn't succeeded, which will trigger a retry (including restarting + // the transaction) + error = null; + succeeded = false; + } + cb(error, succeeded); + }; + + // MongoDB locks collections when writing transactions. This can lead to deadlock if + // we have two different transactions attempting to update the same collection. + // Here, we manually keep track of the collections involved in transactions, so that + // we can avoid this deadlock. + var isLocked = this._lockedCollections[collectionName] && + this._lockedCollections[collectionName] !== options.transaction; + + if (isLocked) { + // In order to avoid deadlock, let's impose a priority system: + // - commits without transactions take highest priority, since they're most likely to succeed + // - after that, determine priority on alphabetical order of Transaction ID + var hasLockPriority = !options.transaction || options.transactionId < this._lockedCollections[collectionName]; + // If we don't have priority, return a failed write to allow a retry, which should hopefully + // allow the other transaction to complete + if (!hasLockPriority) return callback(null, false); + + // If we do have lock priority, we should abort the other transaction to unblock our + // current transaction + // TODO: Wait for this? + this.abortTransaction(this._lockedCollections[collectionName], function(error) { + // TODO: Handle errors + }); + } + + if (options.transaction) this._lockedCollections[collectionName] = options.transaction; + + this._writeOp(collectionName, id, op, snapshot, options, function(err, result) { if (err) return callback(err); var opId = result.insertedId; - self._writeSnapshot(request, id, snapshot, opId, function(err, succeeded) { + self._writeSnapshot(request, id, snapshot, opId, options, function(err, succeeded) { if (succeeded) return callback(err, succeeded); // Cleanup unsuccessful op if snapshot write failed. This is not // necessary for data correctness, but it gets rid of clutter @@ -219,6 +265,72 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio }); }; +ShareDbMongo.prototype.startTransaction = function(transactionId, callback) { + if (typeof transactionId !== 'string') throw new Error('Invalid Transaction ID'); + + if (this._sessions[transactionId]) { + // TODO: Proper error code + return callback(new Error('Transaction already in progress')); + } + + this._transactionOpLinks[transactionId] = []; + var session = this._sessions[transactionId] = this._mongoClient.startSession(); + session.startTransaction(); + + var self = this; + session.once('ended', function() { + self._cleanUpTransaction(transactionId); + }); + + callback(); +}; + +ShareDbMongo.prototype.restartTransaction = function(transactionId, callback) { + this._cleanUpTransaction(transactionId); + this.startTransaction(transactionId, callback); +}; + +ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) { + var self = this; + var cb = function(error) { + self._cleanUpTransaction(transactionId); + callback(error); + }; + + if (!this._sessions[transactionId]) return cb(); + this._sessions[transactionId].commitTransaction() + .then(function() { + cb(); + }, cb); +}; + +ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) { + var self = this; + var cb = function(error) { + self._cleanUpTransaction(transactionId); + callback(error); + }; + + if (!this._sessions[transactionId]) return cb(); + this._sessions[transactionId].abortTransaction() + .then(function() { + cb(); + }, cb); +}; + +ShareDbMongo.prototype._cleanUpTransaction = function(transactionId) { + var session = this._sessions[transactionId]; + if (session) session.endSession(); + delete this._sessions[transactionId]; + delete this._transactionOpLinks[transactionId]; + // TODO: Improve performance? + for (var collection in this._lockedCollections) { + if (this._lockedCollections[collection] === transactionId) { + delete this._lockedCollections[collection]; + } + } +}; + function createRequestForMiddleware(options, collectionName, op, fields) { // Create a new request object which will be passed to helper functions and middleware var request = { @@ -236,18 +348,28 @@ function createRequestForMiddleware(options, collectionName, op, fields) { return request; } -ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, callback) { +ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, options, callback) { if (typeof op.v !== 'number') { var err = ShareDbMongo.invalidOpVersionError(collectionName, id, op.v); return callback(err); } + var self = this; this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); var doc = shallowClone(op); doc.d = id; - doc.o = snapshot._opLink; - opCollection.insertOne(doc) + var transactionLinks = self._transactionOpLinks[options.transaction] || []; + doc.o = transactionLinks[op.v - 1] || snapshot._opLink; + var session; + if (options.transaction) { + session = self._sessions[options.transaction]; + if (!session) return callback(ShareDbMongo.sessionNotStartedError()); + } + opCollection.insertOne(doc, {session: session}) .then(function(result) { + if (options.transaction) { + self._transactionOpLinks[options.transaction][op.v] = result.insertedId; + } callback(null, result); }, callback); }); @@ -263,17 +385,22 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { }); }; -ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, callback) { +ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, options, callback) { var self = this; this.getCollection(request.collectionName, function(err, collection) { if (err) return callback(err); request.documentToWrite = castToDoc(id, snapshot, opId); + var session; + if (options.transaction) { + session = self._sessions[options.transaction]; + if (!session) return callback(ShareDbMongo.sessionNotStartedError()); + } if (request.documentToWrite._v === 1) { self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { if (middlewareErr) { return callback(middlewareErr); } - collection.insertOne(request.documentToWrite) + collection.insertOne(request.documentToWrite, {session: session}) .then( function() { callback(null, true); @@ -294,7 +421,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, ca if (middlewareErr) { return callback(middlewareErr); } - collection.replaceOne(request.query, request.documentToWrite) + collection.replaceOne(request.query, request.documentToWrite, {session: session}) .then(function(result) { var succeeded = !!result.modifiedCount; callback(null, succeeded); @@ -1668,6 +1795,10 @@ ShareDbMongo.parseQueryError = function(err) { err.code = 5104; return err; }; +ShareDbMongo.sessionNotStartedError = function() { + // TODO: Proper code + return {code: 5105, message: 'Session not started'}; +}; // Middleware diff --git a/package.json b/package.json index 6332045..32f91a5 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "main": "index.js", "dependencies": { "mongodb": "^3.1.13 || ^4.0.0 || ^5.0.0 || ^6.0.0", - "sharedb": "^1.9.1 || ^2.0.0 || ^3.0.0 || ^4.0.0 || ^5.0.0" + "sharedb": "file:../sharedb" }, "devDependencies": { "async": "^3.2.4", diff --git a/test/test_mongo.js b/test/test_mongo.js index 36ba42c..521e5ec 100644 --- a/test/test_mongo.js +++ b/test/test_mongo.js @@ -18,7 +18,7 @@ function create(callback) { }); }; -require('sharedb/test/db')({create: create, getQuery: getQuery}); +require('sharedb/test/db')({create: create, getQuery: getQuery, transactions: true}); describe('mongo db', function() { beforeEach(function(done) {