diff --git a/src/drivers/abstract/index.js b/src/drivers/abstract/index.js index 48bf4a78..5b342d1b 100644 --- a/src/drivers/abstract/index.js +++ b/src/drivers/abstract/index.js @@ -12,6 +12,33 @@ module.exports = class AbstractDriver { constructor(opts = {}) { const { logger } = opts; this.logger = logger instanceof Logger ? logger : new Logger(logger); + this.idleTimeout = opts.idleTimeout || 60; + this.options = opts; + } + + closeConnection(connection) { + throw new Error('not implemented'); + } + + recycleConnections() { + const acquiredAt = new Map(); + const timeout = this.idleTimeout * 1000; + + this.pool.on('acquire', function onAcquire(connection) { + acquiredAt.set(connection, Date.now()); + }); + + const checkIdle = () => { + const now = Date.now(); + for (const [ connection, timestamp ] of acquiredAt) { + if (now - timestamp > timeout) { + this.closeConnection(connection); + acquiredAt.delete(connection); + } + } + setTimeout(checkIdle, timeout); + }; + checkIdle(); } /** diff --git a/src/drivers/mysql/index.js b/src/drivers/mysql/index.js index d743cf83..be6743f1 100644 --- a/src/drivers/mysql/index.js +++ b/src/drivers/mysql/index.js @@ -21,17 +21,31 @@ class MysqlDriver extends AbstractDriver { * @param {boolean} opts.stringifyObjects - stringify object value in dataValues */ constructor(opts = {}) { + super(opts); + this.type = 'mysql'; + this.pool = this.createPool(opts); + this.escape = this.pool.escape.bind(this.pool); + this.recycleConnections(); + } + + get escapeId() { + return this.pool.escapeId; + } + + createPool(opts) { + // some RDMS use appName to locate the database instead of the actual db, though the table_schema stored in infomation_schema.columns is still the latter one. + const database = opts.appName || opts.database; const client = opts.client || 'mysql'; + const { + host, port, user, password, + connectionLimit, charset, stringifyObjects = false, + } = opts; + if (client !== 'mysql' && client !== 'mysql2') { throw new Error(`Unsupported mysql client ${client}`); } - const { host, port, user, password, connectionLimit, charset, stringifyObjects = false } = opts; - // some RDMS use appName to locate the database instead of the actual db, though the table_schema stored in infomation_schema.columns is still the latter one. - const database = opts.appName || opts.database; - super(opts); - this.type = 'mysql'; - this.database = database; - this.pool = require(client).createPool({ + + return require(client).createPool({ connectionLimit, host, port, @@ -41,12 +55,6 @@ class MysqlDriver extends AbstractDriver { charset, stringifyObjects, }); - - this.escape = this.pool.escape.bind(this.pool); - } - - get escapeId() { - return this.pool.escapeId; } getConnection() { @@ -61,11 +69,16 @@ class MysqlDriver extends AbstractDriver { }); } + closeConnection(connection) { + connection.release(); + connection.destroy(); + } + async query(query, values, opts = {}) { - const { pool, logger } = this; - const { connection } = opts; + const { logger } = this; + const connection = opts.connection || await this.getConnection(); const promise = new Promise((resolve, reject) => { - (connection || pool).query(query, values, (err, results, fields) => { + connection.query(query, values, (err, results, fields) => { if (err) { reject(err); } else { @@ -82,6 +95,8 @@ class MysqlDriver extends AbstractDriver { } catch (err) { logger.logQueryError(sql, err, Date.now() - start, opts); throw err; + } finally { + if (!opts.connection) connection.release(); } logger.logQuery(sql, Date.now() - start, opts); diff --git a/src/drivers/mysql/schema.js b/src/drivers/mysql/schema.js index 1002ed00..5b8c4b3f 100644 --- a/src/drivers/mysql/schema.js +++ b/src/drivers/mysql/schema.js @@ -58,7 +58,8 @@ module.exports = { * @param {string} newColumn the new column name */ async renameColumn(table, name, newName) { - const { database, escapeId } = this; + const { escapeId } = this; + const { database } = this.options; const { columnName } = new Attribute(name); const schemaInfo = await this.querySchemaInfo(database, table); const { columnName: _, ...columnInfo } = schemaInfo[table].find(entry => { diff --git a/src/drivers/postgres/index.js b/src/drivers/postgres/index.js index 5cc77705..28e90230 100644 --- a/src/drivers/postgres/index.js +++ b/src/drivers/postgres/index.js @@ -107,21 +107,30 @@ function parameterize(sql, values) { class PostgresDriver extends AbstractDriver { constructor(opts = {}) { super(opts); - const { host, port, user, password, database } = opts; - this.type = 'postgres'; - this.pool = new Pool({ host, port, user, password, database }); + this.pool = this.createPool(opts); + this.recycleConnections(); + } + + createPool(opts) { + const { host, port, user, password, database } = opts; + return new Pool({ host, port, user, password, database }); } async getConnection() { return await this.pool.connect(); } + async closeConnection(client) { + client.release(); + await client.end(); + } + async query(query, values, spell = {}) { const { sql, nestTables } = typeof query === 'string' ? { sql: query } : query; const { text } = parameterize(sql, values); const { logger } = this; - const client = spell && spell.connection || this.pool; + const connection = spell.connection || await this.getConnection(); const command = sql.slice(0, sql.indexOf(' ')).toLowerCase(); async function tryQuery(...args) { @@ -130,10 +139,12 @@ class PostgresDriver extends AbstractDriver { let result; try { - result = await client.query(...args); + result = await connection.query(...args); } catch (err) { logger.logQueryError(formatted, err, Date.now() - start, spell); throw err; + } finally { + if (!spell.connection) connection.release(); } logger.logQuery(formatted, Date.now() - start, spell); diff --git a/src/drivers/sqlite/index.js b/src/drivers/sqlite/index.js index 26526974..5a6df607 100644 --- a/src/drivers/sqlite/index.js +++ b/src/drivers/sqlite/index.js @@ -1,5 +1,6 @@ 'use strict'; +const EventEmitter = require('events'); const strftime = require('strftime'); const AbstractDriver = require('../abstract'); @@ -34,11 +35,11 @@ function nest(rows, fields, spell) { } class Connection { - constructor({ client, database, mode, logger }) { + constructor({ client, database, mode, pool }) { const { Database, OPEN_READWRITE, OPEN_CREATE } = client; if (mode == null) mode = OPEN_READWRITE | OPEN_CREATE; this.database = new Database(database, mode); - this.logger = logger; + this.pool = pool; } async query(query, values, spell) { @@ -48,15 +49,14 @@ class Connection { const result = await this.all(sql, values); if (nestTables) return nest(result.rows, result.fields, spell); return result; - } else { - return await this.run(sql, values); } + return await this.run(sql, values); } all(sql, values) { return new Promise((resolve, reject) => { this.database.all(sql, values, (err, rows, fields) => { - if (err) reject(new Error(err.stack)); + if (err) reject(err); else resolve({ rows, fields }); }); }); @@ -70,39 +70,88 @@ class Connection { }); }); } + + release() { + this.pool.releaseConnection(this); + } + + async destroy() { + const { connections } = this.pool; + const index = connections.indexOf(this); + if (index >= 0) connections.splice(index, 1); + + return await new Promise((resolve, reject) => { + this.database.close(function(err) { + if (err) reject(err); + resolve(); + }); + }); + } +} + +class Pool extends EventEmitter { + constructor(opts) { + super(opts); + this.options = { + connectionLimit: 10, + ...opts, + client: opts.client || 'sqlite3', + }; + this.client = require(this.options.client); + this.connections = []; + this.queue = []; + } + + async getConnection() { + const { connections, queue, client, options } = this; + for (const connection of connections) { + if (connection.idle) { + connection.idle = false; + this.emit('acquire', connection); + return connection; + } + } + if (connections.length < options.connectionLimit) { + const connection = new Connection({ ...options, client, pool: this }); + connections.push(connection); + this.emit('acquire', connection); + return connection; + } + await new Promise(resolve => queue.push(resolve)); + return await this.getConnection(); + } + + releaseConnection(connection) { + connection.idle = true; + this.emit('release', connection); + + const { queue } = this; + while (queue.length > 0) { + const task = queue.shift(); + task(); + } + } } class SqliteDriver extends AbstractDriver { constructor(opts = {}) { super(opts); - const { logger } = this; - const client = require(opts.client || 'sqlite3'); this.type = 'sqlite'; - this.connections = [ new Connection({ ...opts, client, logger }) ]; - this.callbacks = []; + this.pool = this.createPool(opts); + this.recycleConnections(); } - async getConnection() { - const { connections, callbacks } = this; - - if (connections.length > 0) { - const connection = connections.shift(); - return Object.assign(connection, { - release() { - connections.push(connection); - while (callbacks.length > 0) { - const callback = callbacks.shift(); - callback(); - } - }, - }); - } + createPool(opts) { + return new Pool(opts); + } - await new Promise((resolve) => { - callbacks.push(resolve); - }); + async getConnection() { + return await this.pool.getConnection(); + } - return this.getConnection(); + async closeConnection(connection) { + connection.release(); + await connection.destroy(); } async query(query, values, opts = {}) { diff --git a/test/unit/adapters/sequelize.test.js b/test/unit/adapters/sequelize.test.js index 3c0a6a63..be866ada 100644 --- a/test/unit/adapters/sequelize.test.js +++ b/test/unit/adapters/sequelize.test.js @@ -351,9 +351,8 @@ describe('=> Sequelize adapter', () => { ].map(opts => Post.create(opts))); const stub = sinon.stub(logger, 'warn').callsFake((tag, message) => { - throw new Error(message); - } - ); + throw new Error(message); + }); let posts = await Post.findAll({ where: { @@ -364,7 +363,7 @@ describe('=> Sequelize adapter', () => { assert.equal(posts[0].title, 'Leah'); await Post.destroy({ title: 'Leah' }); - const post = await Post.findOne({ title: 'Leah' }); + const post = await Post.findOne({ where: { title: 'Leah' } }); assert(!post); posts = await Post.findAll({ @@ -494,7 +493,7 @@ describe('=> Sequelize adapter', () => { { title: 'Tyrael' }, ].map(opts => Post.create(opts))); await Post.destroy({ title: 'Leah' }); - const post = await Post.findOne({ title: 'Leah' }); + const post = await Post.findOne({ where: { title: 'Leah' } }); assert(!post); const post1 = await Post.findOne({ where: { title: 'Leah' }, paranoid: false }); assert.equal(post1.title, 'Leah'); @@ -518,7 +517,7 @@ describe('=> Sequelize adapter', () => { const post = await Post.findOne(posts[1].id); assert.equal(post.title, 'Tyrael'); - const post2 = await Post.findOne({ title: 'Leah' }); + const post2 = await Post.findOne({ where: { title: 'Leah' } }); assert.equal(post2.title, 'Leah'); }); @@ -531,7 +530,7 @@ describe('=> Sequelize adapter', () => { const post = await Post.find(posts[1].id); assert.equal(post.title, 'Tyrael'); - const post2 = await Post.find({ title: 'Leah' }); + const post2 = await Post.find({ where: { title: 'Leah' } }); assert.equal(post2.title, 'Leah'); }); diff --git a/test/unit/drivers/abstract/index.test.js b/test/unit/drivers/abstract/index.test.js index 8359e080..c83d8aeb 100644 --- a/test/unit/drivers/abstract/index.test.js +++ b/test/unit/drivers/abstract/index.test.js @@ -1,5 +1,6 @@ 'use strict'; +const EventEmitter = require('events'); const assert = require('assert').strict; const dayjs = require('dayjs'); const { Logger } = require('../../../..'); @@ -55,3 +56,23 @@ describe('=> AbstractDriver#logger', function() { assert.ok(driver.logger instanceof CustomLogger); }); }); + +describe('=> AbstractDriver#recycleConnections', function() { + it('should close idle connections', async function() { + const driver = new AbstractDriver({ idleTimeout: 0.01 }); + driver.pool = new EventEmitter(); + let released; + let destroyed; + driver.recycleConnections(); + driver.closeConnection = function() { + released = true; + destroyed = true; + }; + driver.pool.emit('acquire', {}); + assert.ok(!released); + assert.ok(!destroyed); + await new Promise(resolve => setTimeout(resolve, 30)); + assert.ok(released); + assert.ok(destroyed); + }); +}); diff --git a/test/unit/drivers/mysql/index.test.js b/test/unit/drivers/mysql/index.test.js index 3d23f426..3e15ca91 100644 --- a/test/unit/drivers/mysql/index.test.js +++ b/test/unit/drivers/mysql/index.test.js @@ -24,7 +24,7 @@ describe('=> MySQL driver', () => { await driver2.query('SELECT 1'); const [ sql, duration ] = result[0]; assert.equal(sql, 'SELECT 1'); - assert.ok(duration > 0); + assert.ok(duration >= 0); }); it('driver.logger.logQueryError', async () => { @@ -62,7 +62,7 @@ describe('=> MySQL driver', () => { assert.equal(definition.unique, true); }); - it('driver.truncateTable(table', async () => { + it('driver.truncateTable(table)', async () => { const { BIGINT, STRING } = driver.DataTypes; await driver.dropTable('notes'); await driver.createTable('notes', { @@ -74,4 +74,33 @@ describe('=> MySQL driver', () => { await driver.truncateTable('notes'); assert.equal((await driver.query('SELECT count(*) AS count FROM notes')).rows[0].count, 0); }); + + it('driver.recycleConnections()', async function() { + const driver2 = new MysqlDriver({ + ...options, + idleTimeout: 0.01, + }); + let released; + driver2.pool.on('release', function() { + released = true; + }); + const connection = await driver2.getConnection(); + await new Promise(function(resolve, reject) { + connection.query('SELECT 1', function(err, row) { + if (err) reject(err); + resolve({ row }); + }); + }); + assert.ok(!released); + await new Promise(resolve => setTimeout(resolve, 30)); + assert.ok(released); + await assert.rejects(async function() { + await new Promise(function(resolve, reject) { + connection.query('SELECT 1', function(err, row) { + if (err) reject(err); + resolve({ row }); + }); + }); + }, /Error: Cannot enqueue Query after being destroyed./); + }); }); diff --git a/test/unit/drivers/postgres/index.test.js b/test/unit/drivers/postgres/index.test.js index ae2a5868..b5dd6b27 100644 --- a/test/unit/drivers/postgres/index.test.js +++ b/test/unit/drivers/postgres/index.test.js @@ -111,4 +111,17 @@ describe('=> PostgreSQL driver', () => { await driver.query(`INSERT INTO notes (title) VALUES ('Untitled')`); assert.equal((await driver.query('SELECT id FROM notes')).rows[0].id, '1'); }); + + it('driver.recycleConnections()', async function() { + const driver2 = new PostgresDriver({ + ...options, + idleTimeout: 0.01, + }); + const connection = await driver2.getConnection(); + await connection.query('SELECT 1'); + await new Promise(resolve => setTimeout(resolve, 30)); + await assert.rejects(async function() { + await connection.query('SELECT 1'); + }, /Error: Client was closed and is not queryable/); + }); }); diff --git a/test/unit/drivers/sqlite/index.test.js b/test/unit/drivers/sqlite/index.test.js index 409dd339..d65e1d48 100644 --- a/test/unit/drivers/sqlite/index.test.js +++ b/test/unit/drivers/sqlite/index.test.js @@ -161,3 +161,51 @@ describe('=> SQLite driver.query()', () => { assert.equal(is_private, 1); }); }); + +describe('=> SQLite driver.pool', function() { + beforeEach(async () => { + await driver.dropTable('notes'); + }); + + it('should not create connection unless necessary', async function() { + assert.equal(driver.pool.connections.length, 1); + }); + + it('should create connection if upbound limit not reached', async function() { + await Promise.all([ + driver.createTable('notes', { title: STRING, isPrivate: BOOLEAN }), + driver.query('SELECT 2'), + ]); + assert.equal(driver.pool.connections.length, 2); + }); + + it('should wait until connection is available', async function() { + const driver2 = new SqliteDriver({ ...options, connectionLimit: 1 }); + await Promise.all([ + driver2.createTable('notes', { title: STRING, isPrivate: BOOLEAN }), + driver2.query('SELECT 2'), + ]); + assert.equal(driver2.pool.connections.length, 1); + }); + + it('driver.recycleConnections()', async function() { + const driver2 = new SqliteDriver({ + ...options, + idleTimeout: 0.01, + }); + const connection = await driver2.getConnection(); + await connection.query('SELECT 1'); + await new Promise(resolve => setTimeout(resolve, 30)); + await assert.rejects(async function() { + await connection.query('SELECT 1'); + }, /Error: SQLITE_MISUSE: Database is closed/); + + // should remove connection from pool when destroy + assert.equal(driver2.pool.connections.length, 0); + + // should still be able to create new connection + await assert.doesNotReject(async function() { + await driver2.query('SELECT 1'); + }); + }); +});