diff --git a/README.md b/README.md index 4ecceff..9624b0d 100644 --- a/README.md +++ b/README.md @@ -45,9 +45,8 @@ Add a hyper* data structure to replicate. upload: bool, // passed to .replicate download: bool, // passed to .replicate encrypt: bool, // passed to .replicate - discoveryKey: , // optionally set your own discovery key - announce: true, // should the swarm announce you? - lookup: true, // should the swarm do lookups for you? + server: true, // should the swarm announce you? + client: true, // should the swarm do lookups for you? keyPair: { publicKey, secretKey }, // noise keypair used for the connection onauthenticate (remotePublicKey, done) // the onauthenticate hook to verify remote key pairs } diff --git a/index.js b/index.js index 9c7c497..f320934 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,5 @@ const pump = require('pump') -const hyperswarm = require('hyperswarm') +const Hyperswarm = require('hyperswarm') const Protocol = require('hypercore-protocol') const { EventEmitter } = require('events') @@ -26,15 +26,7 @@ module.exports = class Replicator extends EventEmitter { constructor (options = {}) { super() - this.swarm = hyperswarm({ - announceLocalAddress: !!options.announceLocalAddress, - preferredPort: 49737, - bootstrap: options.bootstrap, - queue: { - multiplex: true - } - }) - + this.swarm = new Hyperswarm(options) this.createStream = options.createStream || ((init) => new Protocol(init, options)) this.swarm.on('connection', this._onconnection.bind(this)) this.swarming = new Map() @@ -72,7 +64,10 @@ module.exports = class Replicator extends EventEmitter { stream.on('discovery-key', this._ondiscoverykey.bind(this, stream)) this.streams.add(stream) - stream.on('close', () => this.streams.delete(stream)) + stream.on('close', () => { + this.streams.delete(stream) + this.emit('delete', info, stream) + }) } _ondiscoverykey (stream, discoveryKey) { @@ -92,22 +87,18 @@ module.exports = class Replicator extends EventEmitter { this.swarm.listen() } - destroy () { - return new Promise((resolve, reject) => { - this.swarm.destroy((err) => { - if (err) return reject(err) - this.emit('close') - resolve() - }) - }) + async destroy () { + await this.swarm.clear() + await this.swarm.destroy() + this.emit('close') } async add (core, options = {}) { await ready(core) const key = core.discoveryKey.toString('hex') - const { announce, lookup } = options - const defaultLookup = lookup === undefined && announce === undefined + const { server, client } = options + const defaultClient = client === undefined && server === undefined const added = this.swarming.has(key) const one = new Event() @@ -115,9 +106,11 @@ module.exports = class Replicator extends EventEmitter { this.swarming.set(key, { core, options, one, all }) - if (announce || lookup || defaultLookup) { - this.swarm.join(core.discoveryKey, { announce: !!announce, lookup: !!lookup || defaultLookup }) - this.swarm.flush(onflush) + if (server || client || defaultClient) { + const discovery = this.swarm.join(core.discoveryKey, { server: !!server, client: !!client || defaultClient }) + server && await discovery.flushed() + client && await this.swarm.flush() + onflush() } else { onflush() } diff --git a/package.json b/package.json index f1d3cd3..d346c06 100644 --- a/package.json +++ b/package.json @@ -10,14 +10,14 @@ "license": "MIT", "dependencies": { "hypercore-protocol": "^8.0.7", - "hyperswarm": "^2.13.0", + "hyperswarm": "^3.0.3", "pump": "^3.0.0" }, "devDependencies": { - "dht-rpc": "^4.9.6", - "hypercore": "^9.7.5", + "@hyperswarm/dht": "^5.0.17", + "hypercore": "^9.12.0", "random-access-memory": "^3.1.2", - "standard": "^14.3.1", + "standard": "^16.0.4", "tape": "^5.1.1" }, "repository": { diff --git a/test/basic.js b/test/basic.js index f922bc7..5afde6e 100644 --- a/test/basic.js +++ b/test/basic.js @@ -1,58 +1,56 @@ -const hypercore = require('hypercore') +const Hypercore = require('hypercore') const ram = require('random-access-memory') const { get, append, test } = require('./helpers') test('basic', async function (t, replicator, clone) { - const core = hypercore(ram) + const a = new Hypercore(ram) + await replicator.add(a, { server: true, client: false }) - replicator.add(core, { announce: true, lookup: false }) + await append(a, 'test') - await append(core, 'test') + const aClone = new Hypercore(ram, a.key) + await clone.add(aClone, { client: true, server: false }) - const coreClone = hypercore(ram, core.key) - - clone.add(coreClone, { lookup: true, announce: false }) - - t.same(await get(coreClone, 0), Buffer.from('test')) + t.same(await get(aClone, 0), Buffer.from('test')) }) test('multi core swarm', async function (t, replicator, clone) { - const a = hypercore(ram) - const b = hypercore(ram) + const a = new Hypercore(ram) + const b = new Hypercore(ram) - replicator.add(a, { announce: true, lookup: false }) - replicator.add(b, { announce: true, lookup: false }) + await replicator.add(a, { server: true, client: false }) + await replicator.add(b, { server: true, client: false }) await append(a, 'a test') await append(b, 'b test') - const aClone = hypercore(ram, a.key) - const bClone = hypercore(ram, b.key) + const aClone = new Hypercore(ram, a.key) + const bClone = new Hypercore(ram, b.key) - clone.add(bClone, { lookup: true, announce: false }) - clone.add(aClone, { lookup: true, announce: false }) + clone.add(bClone, { client: true, server: false }) + clone.add(aClone, { client: true, server: false }) t.same(await get(aClone, 0), Buffer.from('a test')) t.same(await get(bClone, 0), Buffer.from('b test')) }) test('multi core swarm higher latency', async function (t, replicator, clone) { - const a = hypercore(ram) - const b = hypercore(ram) + const a = new Hypercore(ram) + const b = new Hypercore(ram) - replicator.add(a, { announce: true, lookup: false }) + await replicator.add(a, { server: true, client: false }) await append(a, 'a test') await append(b, 'b test') - const aClone = hypercore(ram, a.key) - const bClone = hypercore(ram, b.key) + const aClone = new Hypercore(ram, a.key) + const bClone = new Hypercore(ram, b.key) - clone.add(bClone, { lookup: true, announce: false }) - clone.add(aClone, { lookup: true, announce: false }) + await clone.add(bClone, { client: true, server: false }) + await clone.add(aClone, { client: true, server: false }) - replicator.on('discovery-key', function () { - replicator.add(b, { announce: true, lookup: false }) + replicator.on('discovery-key', () => { + replicator.add(b, { server: true, client: false }) }) t.same(await get(aClone, 0), Buffer.from('a test')) diff --git a/test/helpers/index.js b/test/helpers/index.js index 2229b38..71d84f4 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -1,7 +1,19 @@ -const dht = require('dht-rpc') +const HyperDHT = require('@hyperswarm/dht') const tape = require('tape') const Replicator = require('../../') +async function test (msg, fn) { + tape(msg, async t => { + const [bootstrap, destroyBootstrap] = await createBootstrap() + const replicators = fill(2).map(() => + new Replicator({ bootstrap }) + ) + await fn(t, ...replicators) + await destroy(replicators) + await destroyBootstrap() + }) +} + module.exports = { get, append, ready, test } function get (core, seq) { @@ -31,36 +43,48 @@ function ready (core) { }) } -function test (msg, fn) { - tape(msg, function (t) { - return new Promise((resolve, reject) => { - const bootstraper = dht({ ephemeral: true }) - - bootstraper.listen(0, async function () { - const replicators = [makeReplicator(), makeReplicator()] +async function createBootstrap () { + const bootstrappers = fill(2).map(() => + new HyperDHT({ + ephemeral: true, + bootstrap: [] + }) + ) + await init(bootstrappers) + const bootstrap = bootstrappers.map(node => ({ + host: '127.0.0.1', + port: node.address().port + })) + const nodes = fill(2).map(() => + new HyperDHT({ + ephemeral: false, + bootstrap + }) + ) + await init(nodes) + return [bootstrap, () => destroy(bootstrappers, nodes)] +} - let missing = replicators.length - for (const r of replicators) { - r.on('close', () => { - if (--missing > 0) return - bootstraper.destroy() - }) - } +async function init (...nodes) { + return await Promise.all(nodes.map(async node => { + if (Array.isArray(node)) { + await init(...node) + } else { + await node.ready() + } + })) +} - try { - await fn(t, ...replicators) - resolve() - } catch (err) { - reject(err) - } finally { - for (const r of replicators) r.destroy() - } +async function destroy (...nodes) { + return await Promise.all(nodes.map(async node => { + if (Array.isArray(node)) { + await destroy(...node) + } else { + await node.destroy() + } + })) +} - function makeReplicator () { - const bootstrap = ['localhost:' + bootstraper.address().port] - return new Replicator({ bootstrap }) - } - }) - }) - }) +function fill (n) { + return Array(n).fill(null) }