Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to hyperswarm@0.3.3 #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ Add a hyper* data structure to replicate.
upload: bool, // passed to .replicate
download: bool, // passed to .replicate
encrypt: bool, // passed to .replicate
discoveryKey: <buf>, // optionally set your own discovery key
announce: true, // should the swarm announce you?
lookup: true, // should the swarm do lookups for you?
server: true, // should the swarm announce you?
client: true, // should the swarm do lookups for you?
keyPair: { publicKey, secretKey }, // noise keypair used for the connection
onauthenticate (remotePublicKey, done) // the onauthenticate hook to verify remote key pairs
}
Expand Down
41 changes: 17 additions & 24 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const pump = require('pump')
const hyperswarm = require('hyperswarm')
const Hyperswarm = require('hyperswarm')
const Protocol = require('hypercore-protocol')
const { EventEmitter } = require('events')

Expand All @@ -26,15 +26,7 @@ module.exports = class Replicator extends EventEmitter {
constructor (options = {}) {
super()

this.swarm = hyperswarm({
announceLocalAddress: !!options.announceLocalAddress,
preferredPort: 49737,
bootstrap: options.bootstrap,
queue: {
multiplex: true
}
})

this.swarm = new Hyperswarm(options)
this.createStream = options.createStream || ((init) => new Protocol(init, options))
this.swarm.on('connection', this._onconnection.bind(this))
this.swarming = new Map()
Expand Down Expand Up @@ -72,7 +64,10 @@ module.exports = class Replicator extends EventEmitter {
stream.on('discovery-key', this._ondiscoverykey.bind(this, stream))

this.streams.add(stream)
stream.on('close', () => this.streams.delete(stream))
stream.on('close', () => {
this.streams.delete(stream)
this.emit('delete', info, stream)
})
}

_ondiscoverykey (stream, discoveryKey) {
Expand All @@ -92,32 +87,30 @@ module.exports = class Replicator extends EventEmitter {
this.swarm.listen()
}

destroy () {
return new Promise((resolve, reject) => {
this.swarm.destroy((err) => {
if (err) return reject(err)
this.emit('close')
resolve()
})
})
async destroy () {
await this.swarm.clear()
await this.swarm.destroy()
this.emit('close')
}

async add (core, options = {}) {
await ready(core)

const key = core.discoveryKey.toString('hex')
const { announce, lookup } = options
const defaultLookup = lookup === undefined && announce === undefined
const { server, client } = options
const defaultClient = client === undefined && server === undefined
const added = this.swarming.has(key)

const one = new Event()
const all = new Event()

this.swarming.set(key, { core, options, one, all })

if (announce || lookup || defaultLookup) {
this.swarm.join(core.discoveryKey, { announce: !!announce, lookup: !!lookup || defaultLookup })
this.swarm.flush(onflush)
if (server || client || defaultClient) {
const discovery = this.swarm.join(core.discoveryKey, { server: !!server, client: !!client || defaultClient })
server && await discovery.flushed()
client && await this.swarm.flush()
onflush()
} else {
onflush()
}
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
"license": "MIT",
"dependencies": {
"hypercore-protocol": "^8.0.7",
"hyperswarm": "^2.13.0",
"hyperswarm": "^3.0.3",
"pump": "^3.0.0"
},
"devDependencies": {
"dht-rpc": "^4.9.6",
"hypercore": "^9.7.5",
"@hyperswarm/dht": "^5.0.17",
"hypercore": "^9.12.0",
"random-access-memory": "^3.1.2",
"standard": "^14.3.1",
"standard": "^16.0.4",
"tape": "^5.1.1"
},
"repository": {
Expand Down
50 changes: 24 additions & 26 deletions test/basic.js
Original file line number Diff line number Diff line change
@@ -1,58 +1,56 @@
const hypercore = require('hypercore')
const Hypercore = require('hypercore')
const ram = require('random-access-memory')
const { get, append, test } = require('./helpers')

test('basic', async function (t, replicator, clone) {
const core = hypercore(ram)
const a = new Hypercore(ram)
await replicator.add(a, { server: true, client: false })

replicator.add(core, { announce: true, lookup: false })
await append(a, 'test')

await append(core, 'test')
const aClone = new Hypercore(ram, a.key)
await clone.add(aClone, { client: true, server: false })

const coreClone = hypercore(ram, core.key)

clone.add(coreClone, { lookup: true, announce: false })

t.same(await get(coreClone, 0), Buffer.from('test'))
t.same(await get(aClone, 0), Buffer.from('test'))
})

test('multi core swarm', async function (t, replicator, clone) {
const a = hypercore(ram)
const b = hypercore(ram)
const a = new Hypercore(ram)
const b = new Hypercore(ram)

replicator.add(a, { announce: true, lookup: false })
replicator.add(b, { announce: true, lookup: false })
await replicator.add(a, { server: true, client: false })
await replicator.add(b, { server: true, client: false })

await append(a, 'a test')
await append(b, 'b test')

const aClone = hypercore(ram, a.key)
const bClone = hypercore(ram, b.key)
const aClone = new Hypercore(ram, a.key)
const bClone = new Hypercore(ram, b.key)

clone.add(bClone, { lookup: true, announce: false })
clone.add(aClone, { lookup: true, announce: false })
clone.add(bClone, { client: true, server: false })
clone.add(aClone, { client: true, server: false })

t.same(await get(aClone, 0), Buffer.from('a test'))
t.same(await get(bClone, 0), Buffer.from('b test'))
})

test('multi core swarm higher latency', async function (t, replicator, clone) {
const a = hypercore(ram)
const b = hypercore(ram)
const a = new Hypercore(ram)
const b = new Hypercore(ram)

replicator.add(a, { announce: true, lookup: false })
await replicator.add(a, { server: true, client: false })

await append(a, 'a test')
await append(b, 'b test')

const aClone = hypercore(ram, a.key)
const bClone = hypercore(ram, b.key)
const aClone = new Hypercore(ram, a.key)
const bClone = new Hypercore(ram, b.key)

clone.add(bClone, { lookup: true, announce: false })
clone.add(aClone, { lookup: true, announce: false })
await clone.add(bClone, { client: true, server: false })
await clone.add(aClone, { client: true, server: false })

replicator.on('discovery-key', function () {
replicator.add(b, { announce: true, lookup: false })
replicator.on('discovery-key', () => {
replicator.add(b, { server: true, client: false })
})

t.same(await get(aClone, 0), Buffer.from('a test'))
Expand Down
84 changes: 54 additions & 30 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
const dht = require('dht-rpc')
const HyperDHT = require('@hyperswarm/dht')
const tape = require('tape')
const Replicator = require('../../')

async function test (msg, fn) {
tape(msg, async t => {
const [bootstrap, destroyBootstrap] = await createBootstrap()
const replicators = fill(2).map(() =>
new Replicator({ bootstrap })
)
await fn(t, ...replicators)
await destroy(replicators)
await destroyBootstrap()
})
}

module.exports = { get, append, ready, test }

function get (core, seq) {
Expand Down Expand Up @@ -31,36 +43,48 @@ function ready (core) {
})
}

function test (msg, fn) {
tape(msg, function (t) {
return new Promise((resolve, reject) => {
const bootstraper = dht({ ephemeral: true })

bootstraper.listen(0, async function () {
const replicators = [makeReplicator(), makeReplicator()]
async function createBootstrap () {
const bootstrappers = fill(2).map(() =>
new HyperDHT({
ephemeral: true,
bootstrap: []
})
)
await init(bootstrappers)
const bootstrap = bootstrappers.map(node => ({
host: '127.0.0.1',
port: node.address().port
}))
const nodes = fill(2).map(() =>
new HyperDHT({
ephemeral: false,
bootstrap
})
)
await init(nodes)
return [bootstrap, () => destroy(bootstrappers, nodes)]
}

let missing = replicators.length
for (const r of replicators) {
r.on('close', () => {
if (--missing > 0) return
bootstraper.destroy()
})
}
async function init (...nodes) {
return await Promise.all(nodes.map(async node => {
if (Array.isArray(node)) {
await init(...node)
} else {
await node.ready()
}
}))
}

try {
await fn(t, ...replicators)
resolve()
} catch (err) {
reject(err)
} finally {
for (const r of replicators) r.destroy()
}
async function destroy (...nodes) {
return await Promise.all(nodes.map(async node => {
if (Array.isArray(node)) {
await destroy(...node)
} else {
await node.destroy()
}
}))
}

function makeReplicator () {
const bootstrap = ['localhost:' + bootstraper.address().port]
return new Replicator({ bootstrap })
}
})
})
})
function fill (n) {
return Array(n).fill(null)
}