diff --git a/.changeset/hungry-hounds-live.md b/.changeset/hungry-hounds-live.md new file mode 100644 index 0000000..6611350 --- /dev/null +++ b/.changeset/hungry-hounds-live.md @@ -0,0 +1,5 @@ +--- +"@xmtp/broadcast-sdk": minor +--- + +Added Broadcast SDK diff --git a/README.md b/README.md index 97bad73..5c0a1a1 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,4 @@ This repo provides a collection of tools for running high-quality XMTP bots in N - [Fs Persistence](./packages/fs-persistence/README.md): Provides file system-based data persistence for XMTP clients, enabling data storage and retrieval directly from the file system. - [Redis Persistence](./packages/redis-persistence/README.md): Implements Redis-based persistence for XMTP clients, supporting efficient data storage and access in a Redis database. - [CLI Starter](./packages/cli-starter/README.md): It includes a basic setup and examples to get started with building a command-line interface for XMTP. +- [Broadcast Client](./packages/broadcast/README.md): It includes a basic setup and examples to get started with building a command-line interface for XMTP. diff --git a/packages/broadcast/README.md b/packages/broadcast/README.md new file mode 100644 index 0000000..dec858a --- /dev/null +++ b/packages/broadcast/README.md @@ -0,0 +1,21 @@ +# Broadcast SDK + +## Installation +``` +yarn add @xmtp/broadcast-sdk +``` + +## Usage +```ts +import { Client } from "@xmtp/xmtp-js" +import { BroadcastClient } from "@xmtp/broadcast-sdk" +// It is highly recommended to use the GRPC client +const client = await Client.create(wallet) + +const broadcastClient = new BroadcastClient({ + client, + addresses: ["0x1234", "0x5678"], + cachedCanMessageAddresses: ["0x1234"], +}) +broadcastClient.broadcast(['Hello!']) +``` diff --git a/packages/broadcast/package.json b/packages/broadcast/package.json new file mode 100644 index 0000000..177197e --- /dev/null +++ b/packages/broadcast/package.json @@ -0,0 +1,48 @@ +{ + "name": "@xmtp/broadcast-sdk", + "version": "0.1.0", + "description": "Helper package for broadcasting XMTP messages to subscribers", + "type": "module", + "main": "dist/index.cjs", + "module": "dist/index.js", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js", + "require": "./dist/index.cjs" + } + }, + "engines": { + "node": ">=18" + }, + "files": [ + "dist" + ], + "publishConfig": { + "access": "public", + "provenance": true + }, + "scripts": { + "clean": "rm -rf dist", + "build": "yarn clean && rollup -c", + "prepublishOnly": "yarn build", + "test": "vitest run ./src" + }, + "author": "XMTP Labs ", + "license": "MIT", + "bugs": { + "url": "https://github.com/xmtp/xmtp-node-js-tools/issues" + }, + "homepage": "https://github.com/xmtp/xmtp-node-js-tools#readme", + "packageManager": "yarn@4.0.0", + "devDependencies": { + "@rollup/plugin-typescript": "^11.1.6", + "@xmtp/xmtp-js": "^11.3.12", + "ethers": "^6.10.0", + "rollup": "^4.13.0", + "rollup-plugin-dts": "^6.1.0", + "typescript": "^5.4.5", + "vitest": "^1.0.1" + } +} diff --git a/packages/broadcast/rollup.config.js b/packages/broadcast/rollup.config.js new file mode 100644 index 0000000..6fbea6a --- /dev/null +++ b/packages/broadcast/rollup.config.js @@ -0,0 +1,43 @@ +import typescript from "@rollup/plugin-typescript" +import { defineConfig } from "rollup" +import { dts } from "rollup-plugin-dts" + +const external = ["@xmtp/proto", "node:crypto", "@xmtp/xmtp-js", "long"] + +const plugins = [ + typescript({ + declaration: false, + declarationMap: false, + }), +] + +export default defineConfig([ + { + input: "src/index.ts", + output: { + file: "dist/index.js", + format: "es", + sourcemap: true, + }, + plugins, + external, + }, + { + input: "src/index.ts", + output: { + file: "dist/index.cjs", + format: "cjs", + sourcemap: true, + }, + plugins, + external, + }, + { + input: "src/index.ts", + output: { + file: "dist/index.d.ts", + format: "es", + }, + plugins: [dts()], + }, +]) diff --git a/packages/broadcast/src/BroadcastClient.ts b/packages/broadcast/src/BroadcastClient.ts new file mode 100644 index 0000000..8ca32d4 --- /dev/null +++ b/packages/broadcast/src/BroadcastClient.ts @@ -0,0 +1,298 @@ +import { type Client, type Conversation } from "@xmtp/xmtp-js" + +import { BroadcastConstructorParams, BroadcastOptions } from "./types" + +const GENERAL_RATE_LIMIT = 10000 + +export class BroadcastClient { + client: Client + addresses: string[] + cachedCanMessageAddresses: Set + rateLimitAmount: number + rateLimitTime: number + batches: string[][] = [] + errorBatch: string[] = [] + conversationMapping: Map = new Map() + + // Callbacks + /** + * Called when a batch of addresses is about to be sent + */ + onBatchStart?: (addresses: string[]) => void + /** + * Called when a batch of addresses has been sent/failed + */ + onBatchComplete?: (addresses: string[]) => void + /** + * Called when all addresses have been sent/failed + */ + onBroadcastComplete?: () => void + /** + * Called when an address can't be messaged + */ + onCantMessageAddress?: (address: string) => void + /** + * Called when a message is about to be sent + */ + onMessageSending?: (address: string) => void + /** + * Called when a message fails to send + */ + onMessageFailed?: (address: string) => void + /** + * Called when a message is successfully sent + */ + onMessageSent?: (address: string) => void + /** + * Called when the list of addresses that can be messaged is updated, this is useful for caching + */ + onCanMessageAddressesUpdate?: (addresses: string[]) => void + /** + * Called when a delay is about to happen + */ + onDelay?: (ms: number) => void + + constructor({ + client, + addresses, + cachedCanMessageAddresses, + rateLimitAmount = 1000, + rateLimitTime = 1000 * 60 * 5, + onBatchStart, + onBatchComplete, + onBroadcastComplete, + onCantMessageAddress, + onMessageSending, + onMessageFailed, + onMessageSent, + onCanMessageAddressesUpdate, + onDelay, + }: BroadcastConstructorParams) { + this.client = client + this.addresses = addresses + this.cachedCanMessageAddresses = new Set(cachedCanMessageAddresses) + this.rateLimitAmount = rateLimitAmount + this.rateLimitTime = rateLimitTime + this.onBatchStart = onBatchStart + this.onBatchComplete = onBatchComplete + this.onBroadcastComplete = onBroadcastComplete + this.onCantMessageAddress = onCantMessageAddress + this.onMessageSending = onMessageSending + this.onMessageFailed = onMessageFailed + this.onMessageSent = onMessageSent + this.onCanMessageAddressesUpdate = onCanMessageAddressesUpdate + this.onDelay = onDelay + } + + public broadcast = async ( + messages: Exclude[], + options: BroadcastOptions, + ) => { + const skipInitialDelay = options.skipInitialDelay ?? false + const client = this.client + const conversations = await client.conversations.list() + for (const conversation of conversations) { + this.conversationMapping.set(conversation.peerAddress, conversation) + } + if ( + !skipInitialDelay && + conversations.length / 2 > GENERAL_RATE_LIMIT - this.rateLimitAmount + ) { + await this.delay() + } + + this.batches = this.getBatches(messages.length) + for (let batchIndex = 0; batchIndex < this.batches.length; batchIndex++) { + await this.handleBatch({ + addresses: this.batches[batchIndex], + messages, + }) + if (batchIndex !== this.batches.length - 1) { + await this.delay() + } else { + await this.sendErrorBatch(messages) + } + } + this.onBroadcastComplete?.() + } + + private handleBatch = async ({ + addresses, + messages, + }: { + addresses: string[] + messages: ContentTypes[] + }) => { + this.onBatchStart?.(addresses) + await this.canMessageAddresses(addresses, this.onCanMessageAddressesUpdate) + for (const address of addresses) { + if (!this.cachedCanMessageAddresses.has(address)) { + this.onCantMessageAddress?.(address) + continue + } + try { + let conversation = this.conversationMapping.get(address) + if (!conversation) { + conversation = + await this.client.conversations.newConversation(address) + this.conversationMapping.set(address, conversation) + } + + for (const message of messages) { + this.onMessageSending?.(address) + await conversation.send(message) + } + this.onMessageSent?.(address) + // Clear up some memory after we are done with the conversation + this.cachedCanMessageAddresses.delete(address) + this.conversationMapping.delete(address) + } catch (err) { + console.error(err) + this.onMessageFailed?.(address) + this.errorBatch.push(address) + await this.delay() + } + } + this.onBatchComplete?.(addresses) + } + + private sendErrorBatch = async ( + messages: Exclude[], + ) => { + if (this.errorBatch.length === 0) { + return + } + const finalErrors: string[] = [] + for (const address of this.errorBatch) { + try { + const conversation = + await this.client.conversations.newConversation(address) + for (const message of messages) { + await conversation.send(message) + } + this.onMessageSent?.(address) + } catch (err) { + this.onMessageFailed?.(address) + await this.delay() + } + } + this.errorBatch = finalErrors + } + + private canMessageAddresses = async ( + addresses: string[], + onCanMessageAddressesUpdate?: (newAddresses: string[]) => void, + ) => { + const unknownStateAddresses: string[] = [] + for (let i = 0; i < addresses.length; i++) { + if (!this.cachedCanMessageAddresses.has(addresses[i])) { + unknownStateAddresses.push(addresses[i]) + } + } + const canMessageAddresses = await this.client.canMessage( + unknownStateAddresses, + ) + const newCanMessageAddresses: string[] = [] + for (let i = 0; i < addresses.length; i++) { + if (canMessageAddresses[i]) { + newCanMessageAddresses.push(addresses[i]) + this.cachedCanMessageAddresses.add(addresses[i]) + } + } + onCanMessageAddressesUpdate?.(newCanMessageAddresses) + } + + private delay = async () => { + const ms = this.rateLimitTime + this.onDelay?.(ms) + return new Promise((resolve) => setTimeout(resolve, ms)) + } + + private getBatches = (messageCount: number): string[][] => { + let batch: string[] = [] + const batches: string[][] = [] + let batchCount = 0 + for (const address of this.addresses) { + let addressWeight = 0 + // No matter what we will want to send a message so this is the number of messages being sent + addressWeight += messageCount + if (!this.conversationMapping.has(address)) { + // this conversation will likely need to be created + // so we count it as 3 Posts + // 1. create user invite + // 2. create peer invite + // 3. allow contact + addressWeight += 3 + } else { + addressWeight += 1 + } + const newBatchCount = batchCount + addressWeight + if (newBatchCount === this.rateLimitAmount) { + batch.push(address) + batches.push(batch) + batch = [] + batchCount = 0 + } else if (newBatchCount > this.rateLimitAmount) { + batches.push(batch) + batch = [] + batch.push(address) + batchCount = addressWeight + } else { + batch.push(address) + batchCount = newBatchCount + } + } + if (batch.length > 0) { + batches.push(batch) + } + return batches + } + + setAddresses(addresses: string[]) { + this.addresses = addresses + } + + setRateLimitAmount(amount: number) { + this.rateLimitAmount = amount + } + + setRateLimitTime(time: number) { + this.rateLimitTime = time + } + + setOnBatchStart(callback: (addresses: string[]) => void) { + this.onBatchStart = callback + } + + setOnBatchComplete(callback: (addresses: string[]) => void) { + this.onBatchComplete = callback + } + + setOnBroadcastComplete(callback: () => void) { + this.onBroadcastComplete = callback + } + + setOnCantMessageAddress(callback: (address: string) => void) { + this.onCantMessageAddress = callback + } + + setOnMessageSending(callback: (address: string) => void) { + this.onMessageSending = callback + } + + setOnMessageFailed(callback: (address: string) => void) { + this.onMessageFailed = callback + } + + setOnMessageSent(callback: (address: string) => void) { + this.onMessageSent = callback + } + + setOnCanMessageAddressesUpdate(callback: (addresses: string[]) => void) { + this.onCanMessageAddressesUpdate = callback + } + + setOnDelay(callback: (ms: number) => void) { + this.onDelay = callback + } +} diff --git a/packages/broadcast/src/index.test.ts b/packages/broadcast/src/index.test.ts new file mode 100644 index 0000000..7ebc696 --- /dev/null +++ b/packages/broadcast/src/index.test.ts @@ -0,0 +1,166 @@ +// BroadcastClient.test.js +import { Client, Conversation } from "@xmtp/xmtp-js" // Adjust the import according to your file structure +import { beforeEach, describe, expect, it, vi } from "vitest" + +import { BroadcastClient } from "./BroadcastClient" // Adjust the import according to your file structure + +describe("BroadcastClient", () => { + let clientMock: Client + let conversationMock: Conversation + let broadcastClient: BroadcastClient + + beforeEach(() => { + conversationMock = { + send: vi.fn(), + } as unknown as Conversation + + clientMock = { + conversations: { + list: vi.fn().mockResolvedValue([ + { peerAddress: "address1", send: conversationMock.send }, + { peerAddress: "address2", send: conversationMock.send }, + ]), + newConversation: vi.fn().mockResolvedValue(conversationMock), + }, + canMessage: vi.fn().mockResolvedValue([true, true]), + } as unknown as Client + + broadcastClient = new BroadcastClient({ + client: clientMock, + addresses: ["address1", "address2"], + cachedCanMessageAddresses: [], + rateLimitAmount: 1000, + rateLimitTime: 1000 * 60 * 5, + }) + }) + + it("should call client.conversations.list and conversation.send", async () => { + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(clientMock.conversations.list).toHaveBeenCalled() + expect(conversationMock.send).toHaveBeenCalledTimes(4) // 2 addresses * 2 messages each + }) + + it("should call onBatchStart callback", async () => { + const onBatchStart = vi.fn() + broadcastClient.setOnBatchStart(onBatchStart) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onBatchStart).toHaveBeenCalledWith(["address1", "address2"]) + }) + + it("should call onBatchComplete callback", async () => { + const onBatchComplete = vi.fn() + broadcastClient.setOnBatchComplete(onBatchComplete) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onBatchComplete).toHaveBeenCalledWith(["address1", "address2"]) + }) + + it("should call onBroadcastComplete callback", async () => { + const onBroadcastComplete = vi.fn() + broadcastClient.setOnBroadcastComplete(onBroadcastComplete) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onBroadcastComplete).toHaveBeenCalled() + }) + + it("should call onCantMessageAddress callback", async () => { + const onCantMessageAddress = vi.fn() + broadcastClient.setOnCantMessageAddress(onCantMessageAddress) + + // @ts-expect-error we're mocking the method + clientMock.canMessage.mockResolvedValue([false, false]) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onCantMessageAddress).toHaveBeenCalledWith("address1") + expect(onCantMessageAddress).toHaveBeenCalledWith("address2") + }) + + it("should call onMessageSending and onMessageSent callbacks", async () => { + const onMessageSending = vi.fn() + const onMessageSent = vi.fn() + broadcastClient.setOnMessageSending(onMessageSending) + broadcastClient.setOnMessageSent(onMessageSent) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onMessageSending).toHaveBeenCalledWith("address1") + expect(onMessageSending).toHaveBeenCalledWith("address2") + expect(onMessageSent).toHaveBeenCalledWith("address1") + expect(onMessageSent).toHaveBeenCalledWith("address2") + }) + + it("should call onMessageFailed callback", async () => { + const onMessageFailed = vi.fn() + broadcastClient.setRateLimitTime(1) + broadcastClient.setOnMessageFailed(onMessageFailed) + + // @ts-expect-error we're mocking the method + conversationMock.send.mockRejectedValue(new Error("Send failed")) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onMessageFailed).toHaveBeenCalledWith("address1") + expect(onMessageFailed).toHaveBeenCalledWith("address2") + }) + + it("should call onCanMessageAddressesUpdate callback", async () => { + const onCanMessageAddressesUpdate = vi.fn() + broadcastClient.setOnCanMessageAddressesUpdate(onCanMessageAddressesUpdate) + + const messages = ["message1", "message2"] + const options = { skipInitialDelay: true } + + await broadcastClient.broadcast(messages, options) + + expect(onCanMessageAddressesUpdate).toHaveBeenCalledWith([ + "address1", + "address2", + ]) + }) + + it("should call onDelay callback", async () => { + const testRateLimitAmount = 10 + const addresses = Array.from( + { length: testRateLimitAmount }, + (_, i) => `address${i}`, + ) + const onDelay = vi.fn() + broadcastClient.setAddresses(addresses) + broadcastClient.setOnDelay(onDelay) + broadcastClient.setRateLimitTime(1) + broadcastClient.setRateLimitAmount(testRateLimitAmount) + + const messages = ["message1", "message2"] + + await broadcastClient.broadcast(messages, {}) + + expect(onDelay).toHaveBeenCalledTimes(4) + }) +}) diff --git a/packages/broadcast/src/index.ts b/packages/broadcast/src/index.ts new file mode 100644 index 0000000..6f0f5d6 --- /dev/null +++ b/packages/broadcast/src/index.ts @@ -0,0 +1,2 @@ +export * from "./BroadcastClient" +export * from "./types" diff --git a/packages/broadcast/src/types.ts b/packages/broadcast/src/types.ts new file mode 100644 index 0000000..7f2d6a5 --- /dev/null +++ b/packages/broadcast/src/types.ts @@ -0,0 +1,25 @@ +import { type Client } from "@xmtp/xmtp-js" + +export interface BroadcastConstructorParams { + client: Client + addresses: string[] + cachedCanMessageAddresses: string[] + rateLimitAmount?: number + rateLimitTime?: number + + // Callbacks + onBatchStart?: (addresses: string[]) => void + onBatchComplete?: (addresses: string[]) => void + onBroadcastComplete?: () => void + onCantMessageAddress?: (address: string) => void + onCanMessageAddreses?: (addresses: string[]) => void + onMessageSending?: (address: string) => void + onMessageFailed?: (address: string) => void + onMessageSent?: (address: string) => void + onCanMessageAddressesUpdate?: (addresses: string[]) => void + onDelay?: (ms: number) => void +} + +export interface BroadcastOptions { + skipInitialDelay?: boolean +} diff --git a/packages/broadcast/tsconfig.json b/packages/broadcast/tsconfig.json new file mode 100644 index 0000000..f4bb6b8 --- /dev/null +++ b/packages/broadcast/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist/esm", + "composite": false + }, + "includes": ["src/**/*"] +} diff --git a/packages/broadcast/vitest.config.ts b/packages/broadcast/vitest.config.ts new file mode 100644 index 0000000..a1ab5a8 --- /dev/null +++ b/packages/broadcast/vitest.config.ts @@ -0,0 +1,17 @@ +import { defineConfig, mergeConfig } from "vite" +import { defineConfig as defineVitestConfig } from "vitest/config" + +// https://vitejs.dev/config/ +const viteConfig = defineConfig({ + plugins: [], +}) + +const vitestConfig = defineVitestConfig({ + test: { + globals: true, + environment: "node", + setupFiles: "./vitest.setup.ts", + }, +}) + +export default mergeConfig(viteConfig, vitestConfig) diff --git a/packages/broadcast/vitest.setup.ts b/packages/broadcast/vitest.setup.ts new file mode 100644 index 0000000..7c6dd09 --- /dev/null +++ b/packages/broadcast/vitest.setup.ts @@ -0,0 +1,5 @@ +import { webcrypto } from "crypto" + +// eslint-disable-next-line +// @ts-ignore +global.crypto = webcrypto diff --git a/yarn.lock b/yarn.lock index 598bbdb..d51dbc7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3064,6 +3064,20 @@ __metadata: languageName: unknown linkType: soft +"@xmtp/broadcast-sdk@workspace:packages/broadcast": + version: 0.0.0-use.local + resolution: "@xmtp/broadcast-sdk@workspace:packages/broadcast" + dependencies: + "@rollup/plugin-typescript": "npm:^11.1.6" + "@xmtp/xmtp-js": "npm:^11.3.12" + ethers: "npm:^6.10.0" + rollup: "npm:^4.13.0" + rollup-plugin-dts: "npm:^6.1.0" + typescript: "npm:^5.4.5" + vitest: "npm:^1.0.1" + languageName: unknown + linkType: soft + "@xmtp/cli-starter@workspace:packages/cli-starter": version: 0.0.0-use.local resolution: "@xmtp/cli-starter@workspace:packages/cli-starter"