Skip to content

Commit

Permalink
fix(hubble): reduce hub bandwidth with floodsub toggle (#1851)
Browse files Browse the repository at this point in the history
## Motivation

- Hub bandwidth usage has increased to around 400-500 KiB per second, or
40-50 GiB per day
- Needless bandwidth utilization can increase resource requirements to
run Hub

## Context
- The default settings for our libp2p library publish to floodsub and
subscribe to floodsub topic
- Floodsub is a p2p protocol where every message a node receives gets
forwarded to all known peers in the network
- While there are benefits to Floodsub, it has significant network
amplification factors and hits scalability limits as the number of hubs
in the network grows

## Change Summary

- By default, disable `floodPublish` and `fallbackToFloodsub` 
- Expose environment variables `GOSSIPSUB_FALLBACK_TO_FLOODSUB` and
`GOSSIPSUB_FLOOD_PUBLISH` to toggle the values for gossipsub p2p

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [x] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context

If this is a relatively large or complex change, provide more details
here that will help reviewers

<!-- start pr-codex -->

---

## PR-Codex overview
This PR reduces hub bandwidth in the `hubble` app by adding toggles for
`GOSSIPSUB_FALLBACK_TO_FLOODSUB` and `GOSSIPSUB_FLOOD_PUBLISH`.

### Detailed summary
- Added toggles for `GOSSIPSUB_FALLBACK_TO_FLOODSUB` and
`GOSSIPSUB_FLOOD_PUBLISH` in `gossipNode.ts`
- Refactored callback handling in `server.ts`
- Added environment variable checks for `fallbackToFloodsub` and
`floodPublish` in `gossipNodeWorker.ts`
- Added stats tracking for message sizes in `gossipNodeWorker.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
Wazzymandias authored Mar 28, 2024
1 parent b754a8c commit ccd4d96
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/flat-seahorses-pay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix(hubble): reduce hub bandwidth, can be toggled with GOSSIPSUB_FALLBACK_TO_FLOODSUB and GOSSIPSUB_FLOOD_PUBLISH
2 changes: 2 additions & 0 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
data = Buffer.from(Object.values(detail.msg.data as unknown as Record<string, number>));
}

statsd().gauge("gossip.message_size_bytes", data.length, { topic: detail.msg.topic });

const tags: { [key: string]: string } = {
topic: detail.msg.topic,
};
Expand Down
22 changes: 18 additions & 4 deletions apps/hubble/src/network/p2p/gossipNodeWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,25 @@ export class LibP2PNode {
: LIBP2P_CONNECT_TIMEOUT_MS;
}

const fallbackToFloodsub = process.env["GOSSIPSUB_FALLBACK_TO_FLOODSUB"]
? process.env["GOSSIPSUB_FALLBACK_TO_FLOODSUB"] === "true"
: false;

const floodPublish = process.env["GOSSIPSUB_FLOOD_PUBLISH"]
? process.env["GOSSIPSUB_FLOOD_PUBLISH"] === "true"
: false;

const gossip = gossipsub({
emitSelf: false,
gossipsubIWantFollowupMs: gossipsubIWantFollowupMs,
allowPublishToZeroPeers: true,
asyncValidation: true, // Do not forward messages until we've merged it (prevents forwarding known bad messages)
canRelayMessage: true,
directPeers: options.directPeers || [],
emitSelf: false,
fallbackToFloodsub: fallbackToFloodsub,
floodPublish: floodPublish,
gossipsubIWantFollowupMs: gossipsubIWantFollowupMs,
globalSignaturePolicy: options.strictNoSign ? "StrictNoSign" : "StrictSign",
msgIdFn: this.getMessageId.bind(this),
directPeers: options.directPeers || [],
canRelayMessage: true,
seenTTL: GOSSIP_SEEN_TTL, // Bump up the default to handle large flood of messages. 2 mins was not sufficient to prevent a loop
scoreThresholds: { ...options.scoreThresholds },
scoreParams: {
Expand Down Expand Up @@ -663,6 +673,8 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => {
const specificMsg = msg as LibP2PNodeMessage<"gossipMessage">;
const [message] = specificMsg.args;

statsd().gauge("gossip.worker.gossip_message_size_bytes", message.length, 1, { method: "gossipSubmitMessage" });

const publishResult = Result.combine(await libp2pNode.gossipMessage(Message.decode(message)));
const flattenedPeerIds = publishResult.isOk() ? publishResult.value.flatMap((r) => r.recipients) : [];

Expand All @@ -681,6 +693,8 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => {
const specificMsg = msg as LibP2PNodeMessage<"gossipContactInfo">;
const [contactInfo] = specificMsg.args;

statsd().gauge("gossip.worker.gossip_message_size_bytes", contactInfo.length, 1, { method: "gossipContactInfo" });

const publishResult = Result.combine(await libp2pNode.gossipContactInfo(ContactInfoContent.decode(contactInfo)));
const flattenedPeerIds = publishResult.isOk() ? publishResult.value.flatMap((r) => r.recipients) : [];

Expand Down
3 changes: 2 additions & 1 deletion apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ export default class Server {
messages = messages.filter((message) => message.data !== undefined && message.hash.length > 0);
}

callback(null, MessagesResponse.create({ messages }));
const response = MessagesResponse.create({ messages });
callback(null, response);
},
(err: HubError) => {
callback(toServiceError(err));
Expand Down

0 comments on commit ccd4d96

Please sign in to comment.