Skip to content

Commit

Permalink
feat: Include network id in gossip topics to keep networks isolated (#…
Browse files Browse the repository at this point in the history
…861)

* chore: cleaner logging when ignoring peers

* feat: Include network id in gossip topics to keep peers homogeneous

* Add changeset
  • Loading branch information
sanjayprabhu authored Apr 12, 2023
1 parent c5c0d98 commit 34fe54f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .changeset/spotty-dryers-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@farcaster/hubble': minor
---

Include network id in gossip topics to keep networks isolated
39 changes: 27 additions & 12 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { Multiaddr, multiaddr } from '@multiformats/multiaddr';
import { Result, ResultAsync, err, ok } from 'neverthrow';
import { EthEventsProvider, GoerliEthConstants } from '~/eth/ethEventsProvider';
import { GossipNode, MAX_MESSAGE_QUEUE_SIZE } from '~/network/p2p/gossipNode';
import { NETWORK_TOPIC_CONTACT, NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol';
import { PeriodicSyncJobScheduler } from '~/network/sync/periodicSyncJob';
import SyncEngine from '~/network/sync/syncEngine';
import AdminServer from '~/rpc/adminServer';
Expand Down Expand Up @@ -200,7 +199,7 @@ export class Hub implements HubInterface {
constructor(options: HubOptions) {
this.options = options;
this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName());
this.gossipNode = new GossipNode();
this.gossipNode = new GossipNode(this.options.network);

// Create the ETH registry provider, which will fetch ETH events and push them into the engine.
// Defaults to Goerli testnet, which is currently used for Production Farcaster Hubs.
Expand Down Expand Up @@ -563,14 +562,7 @@ export class Hub implements HubInterface {
return;
}

// Ignore peers that are below the minimum supported version.
const theirVersion = message.hubVersion;
const versionCheckResult = ensureAboveMinFarcasterVersion(theirVersion);
if (versionCheckResult.isErr() || message.network !== this.options.network) {
log.warn(
{ peerId, theirVersion, theirNetwork: message.network },
'Peer is running an invalid or outdated version, ignoring'
);
if (!(await this.isValidPeer(peerId, message))) {
await this.gossipNode.removePeerFromAddressBook(peerId);
this.syncEngine.removeContactInfoForPeerId(peerId.toString());
return;
Expand Down Expand Up @@ -684,8 +676,8 @@ export class Hub implements HubInterface {

private registerEventHandlers() {
// Subscribes to all relevant topics
this.gossipNode.gossip?.subscribe(NETWORK_TOPIC_PRIMARY);
this.gossipNode.gossip?.subscribe(NETWORK_TOPIC_CONTACT);
this.gossipNode.gossip?.subscribe(this.gossipNode.primaryTopic());
this.gossipNode.gossip?.subscribe(this.gossipNode.contactInfoTopic());

this.gossipNode.on('message', async (_topic, message) => {
await message.match(
Expand Down Expand Up @@ -835,6 +827,29 @@ export class Hub implements HubInterface {

return ResultAsync.fromPromise(this.rocksDB.commit(txn), (e) => e as HubError);
}
async isValidPeer(ourPeerId: PeerId, message: ContactInfoContent) {
const theirVersion = message.hubVersion;
const theirNetwork = message.network;

const versionCheckResult = ensureAboveMinFarcasterVersion(theirVersion);
if (versionCheckResult.isErr()) {
log.warn(
{ peerId: ourPeerId, theirVersion, ourVersion: FARCASTER_VERSION, errMsg: versionCheckResult.error.message },
'Peer is running an outdated version, ignoring'
);
return false;
}

if (theirNetwork !== this.options.network) {
log.warn(
{ peerId: ourPeerId, theirNetwork, ourNetwork: this.options.network },
'Peer is running a different network, ignoring'
);
return false;
}

return true;
}

/* -------------------------------------------------------------------------- */
/* Test API */
Expand Down
42 changes: 35 additions & 7 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { gossipsub, GossipSub } from '@chainsafe/libp2p-gossipsub';
import { noise } from '@chainsafe/libp2p-noise';
import { ContactInfoContent, GossipMessage, HubAsyncResult, HubError, HubResult, Message } from '@farcaster/hub-nodejs';
import {
ContactInfoContent,
FarcasterNetwork,
GossipMessage,
HubAsyncResult,
HubError,
HubResult,
Message,
} from '@farcaster/hub-nodejs';
import { Connection } from '@libp2p/interface-connection';
import { PeerId } from '@libp2p/interface-peer-id';
import { mplex } from '@libp2p/mplex';
Expand All @@ -11,7 +19,6 @@ import { createLibp2p, Libp2p } from 'libp2p';
import { err, ok, Result, ResultAsync } from 'neverthrow';
import { TypedEmitter } from 'tiny-typed-emitter';
import { ConnectionFilter } from '~/network/p2p/connectionFilter';
import { GOSSIP_TOPICS, NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol';
import { logger } from '~/utils/logger';
import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo } from '~/utils/p2p';
import { PeriodicPeerCheckScheduler } from './periodicPeerCheck';
Expand Down Expand Up @@ -57,6 +64,12 @@ interface NodeOptions {
export class GossipNode extends TypedEmitter<NodeEvents> {
private _node?: Libp2p;
private _periodicPeerCheckJob?: PeriodicPeerCheckScheduler;
private _network: FarcasterNetwork;

constructor(network?: FarcasterNetwork) {
super();
this._network = network ?? FarcasterNetwork.NONE;
}

/** Returns the PeerId (public key) of this node */
get peerId() {
Expand Down Expand Up @@ -174,7 +187,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
async gossipMessage(message: Message) {
const gossipMessage = GossipMessage.create({
message,
topics: [NETWORK_TOPIC_PRIMARY],
topics: [this.primaryTopic()],
peerId: this.peerId?.toBytes() ?? new Uint8Array(),
});
await this.publish(gossipMessage);
Expand All @@ -184,7 +197,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
async gossipContactInfo(contactInfo: ContactInfoContent) {
const gossipMessage = GossipMessage.create({
contactInfoContent: contactInfo,
topics: [NETWORK_TOPIC_PRIMARY],
topics: [this.contactInfoTopic()],
peerId: this.peerId?.toBytes() ?? new Uint8Array(),
});
await this.publish(gossipMessage);
Expand Down Expand Up @@ -252,7 +265,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
});
this.gossip?.addEventListener('message', (event) => {
// ignore messages not in our topic lists (e.g. GossipSub peer discovery messages)
if (GOSSIP_TOPICS.includes(event.detail.topic)) {
if (this.gossipTopics().includes(event.detail.topic)) {
this.emit('message', event.detail.topic, GossipNode.decodeMessage(event.detail.data));
}
});
Expand All @@ -269,7 +282,10 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
log.info({ identity: this.identity }, `Disconnected from: ${event.detail.remotePeer.toString()} `);
});
this.gossip?.addEventListener('message', (event) => {
log.info({ identity: this.identity }, `Received message for topic: ${event.detail.topic}`);
log.info(
{ identity: this.identity, from: (event.detail as any)['from'] },
`Received message for topic: ${event.detail.topic}`
);
});
this.gossip?.addEventListener('subscription-change', (event) => {
log.info(
Expand All @@ -281,6 +297,17 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
});
}

primaryTopic() {
return `f_network_${this._network}_primary`;
}
contactInfoTopic() {
return `f_network_${this._network}_contact_info`;
}

gossipTopics() {
return [this.primaryTopic(), this.contactInfoTopic()];
}

//TODO: Needs better typesafety
static encodeMessage(message: GossipMessage): HubResult<Uint8Array> {
return ok(GossipMessage.encode(message).finish());
Expand Down Expand Up @@ -315,6 +342,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
const listenIPMultiAddr = options.ipMultiAddr ?? MultiaddrLocalHost;
const listenPort = options.gossipPort ?? 0;
const listenMultiAddrStr = `${listenIPMultiAddr}/tcp/${listenPort}`;
const peerDiscoveryTopic = `_farcaster.${this._network}.peer_discovery`;

let announceMultiAddrStrList: string[] = [];
if (options.announceIp && options.gossipPort) {
Expand Down Expand Up @@ -358,7 +386,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
streamMuxers: [mplex()],
connectionEncryption: [noise()],
pubsub: gossip,
peerDiscovery: [pubsubPeerDiscovery()],
peerDiscovery: [pubsubPeerDiscovery({ topics: [peerDiscoveryTopic] })],
}),
(e) => new HubError('unavailable', { message: 'failed to create libp2p node', cause: e as Error })
);
Expand Down
12 changes: 0 additions & 12 deletions apps/hubble/src/network/p2p/protocol.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
import { GossipVersion } from '@farcaster/hub-nodejs';

// Network topic for Farcaster Messages
export const NETWORK_TOPIC_PRIMARY = 'f_network_topic_primary';

// Network topic for Gossip Node ContactInfo messages
export const NETWORK_TOPIC_CONTACT = 'f_network_topic_contact';

// The rate at which nodes republish ContactInfo
export const GOSSIP_CONTACT_INTERVAL = 10_000;

// List of all valid Gossip Topics
export const GOSSIP_TOPICS = [NETWORK_TOPIC_CONTACT, NETWORK_TOPIC_PRIMARY];

// Current gossip protocol version
export const GOSSIP_PROTOCOL_VERSION = GossipVersion.V1;
3 changes: 1 addition & 2 deletions apps/hubble/src/network/utils/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
import { PeerId } from '@libp2p/interface-peer-id';
import { createEd25519PeerId } from '@libp2p/peer-id-factory';
import { Factory } from 'fishery';
import { NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol';
import { HASH_LENGTH, SyncId } from '~/network/sync/syncId';

const GossipAddressInfoFactory = Factory.define<GossipAddressInfo>(() => {
Expand Down Expand Up @@ -42,7 +41,7 @@ const GossipMessageFactory = Factory.define<GossipMessage, { peerId?: PeerId },
return GossipMessage.create({
peerId: transientParams.peerId ? transientParams.peerId.toBytes() : new Uint8Array(),
message: Factories.Message.build(),
topics: [NETWORK_TOPIC_PRIMARY],
topics: ['f_network_0_primary'],
version: GossipVersion.V1,
});
}
Expand Down
9 changes: 4 additions & 5 deletions apps/hubble/src/test/e2e/gossipNetwork.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { GossipNode } from '~/network/p2p/gossipNode';
import { NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol';
import { sleep } from '~/utils/crypto';
import { NetworkFactories } from '../../network/utils/factories';
import { GossipMessage } from '@farcaster/hub-nodejs';
Expand Down Expand Up @@ -41,9 +40,9 @@ describe('gossip network tests', () => {
const result = await n.connect(nodes[0] as GossipNode);
expect(result.isOk()).toBeTruthy();
}

const primaryTopic = nodes[0]!.primaryTopic();
// Subscribe each node to the test topic
nodes.forEach((n) => n.gossip?.subscribe(NETWORK_TOPIC_PRIMARY));
nodes.forEach((n) => n.gossip?.subscribe(primaryTopic));

// Sleep 5 heartbeats to let the gossipsub network form
await sleep(PROPAGATION_DELAY);
Expand Down Expand Up @@ -82,8 +81,8 @@ describe('gossip network tests', () => {
nonSenderNodes.map((n) => {
const topics = messageStore.get(n.peerId?.toString() ?? '');
expect(topics).toBeDefined();
expect(topics?.has(NETWORK_TOPIC_PRIMARY)).toBeTruthy();
const topicMessages = topics?.get(NETWORK_TOPIC_PRIMARY) ?? [];
expect(topics?.has(primaryTopic)).toBeTruthy();
const topicMessages = topics?.get(primaryTopic) ?? [];
expect(topicMessages.length).toBe(1);
expect(topicMessages[0]).toEqual(message);
});
Expand Down

0 comments on commit 34fe54f

Please sign in to comment.