Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: socket-io client should not disconnect with no event reply #1800

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions docs/socket-io/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@ export type Topic =
| NftAssetEventTopic
| NftCollectionEventTopic;

// Allows timeout callbacks for messages. See
// https://socket.io/docs/v4/typescript/#emitting-with-a-timeout
type WithTimeoutAck<isSender extends boolean, args extends any[]> = isSender extends true ? [Error, ...args] : args;

export interface ClientToServerMessages {
subscribe: (topic: Topic | Topic[], callback: (error: string | null) => void) => void;
unsubscribe: (...topic: Topic[]) => void;
}

export interface ServerToClientMessages<isSender extends boolean = false> {
block: (block: Block, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
microblock: (microblock: Microblock, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
mempool: (transaction: MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
'nft-event': (event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse, callback: (...args: WithTimeoutAck<isSender, [string]>) => void) => void;
export interface ServerToClientMessages {
block: (block: Block) => void;
microblock: (microblock: Microblock) => void;
mempool: (transaction: MempoolTransaction) => void;
'nft-event': (event: NftEvent) => void;
[key: TransactionTopic]: (transaction: Transaction | MempoolTransaction) => void;
[key: NftAssetEventTopic]: (assetIdentifier: string, value: string, event: NftEvent) => void;
[key: NftCollectionEventTopic]: (assetIdentifier: string, event: NftEvent) => void;
[key: AddressTransactionTopic]: (address: string, stxBalance: AddressTransactionWithTransfers) => void;
[key: AddressStxBalanceTopic]: (address: string, stxBalance: AddressStxBalanceResponse) => void;
}
80 changes: 15 additions & 65 deletions src/api/routes/ws/channels/socket-io-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const component = { component: 'socket-io' };
* SocketIO channel for sending real time API updates.
*/
export class SocketIOChannel extends WebSocketChannel {
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>;
private io?: SocketIOServer<ClientToServerMessages, ServerToClientMessages>;
private adapter?: Adapter;

constructor(server: http.Server) {
Expand All @@ -44,14 +44,11 @@ export class SocketIOChannel extends WebSocketChannel {
}

connect(): void {
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages<true>>(
this.server,
{
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
}
);
const io = new SocketIOServer<ClientToServerMessages, ServerToClientMessages>(this.server, {
cors: { origin: '*' },
pingInterval: getWsPingIntervalMs(),
pingTimeout: getWsPingTimeoutMs(),
});
this.io = io;

io.on('connection', async socket => {
Expand Down Expand Up @@ -169,13 +166,6 @@ export class SocketIOChannel extends WebSocketChannel {
return false;
}

private async getTopicSockets(room: Topic) {
if (!this.io) {
return;
}
return await this.io.to(room).fetchSockets();
}

send<P extends keyof WebSocketPayload>(
payload: P,
...args: ListenerType<WebSocketPayload[P]>
Expand All @@ -190,52 +180,32 @@ export class SocketIOChannel extends WebSocketChannel {
case 'block': {
const [block] = args as ListenerType<WebSocketPayload['block']>;
this.prometheus?.sendEvent('block');
void this.getTopicSockets('block').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('block', block, _ => socket.disconnect(true))
)
);
this.io?.to('block').emit('block', block);
break;
}
case 'microblock': {
const [microblock] = args as ListenerType<WebSocketPayload['microblock']>;
this.prometheus?.sendEvent('microblock');
void this.getTopicSockets('microblock').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('microblock', microblock, _ => socket.disconnect(true))
)
);
this.io?.to('microblock').emit('microblock', microblock);
break;
}
case 'mempoolTransaction': {
const [tx] = args as ListenerType<WebSocketPayload['mempoolTransaction']>;
this.prometheus?.sendEvent('mempool');
void this.getTopicSockets('mempool').then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('mempool', tx, _ => socket.disconnect(true))
)
);
this.io?.to('mempool').emit('mempool', tx);
break;
}
case 'transaction': {
const [tx] = args as ListenerType<WebSocketPayload['transaction']>;
this.prometheus?.sendEvent('transaction');
const topic: TransactionTopic = `transaction:${tx.tx_id}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit(topic, tx, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, tx);
break;
}
case 'nftEvent': {
const [event] = args as ListenerType<WebSocketPayload['nftEvent']>;
this.prometheus?.sendEvent('nft-event');
void this.getTopicSockets(`nft-event`).then(sockets =>
sockets?.forEach(socket =>
socket.timeout(timeout).emit('nft-event', event, _ => socket.disconnect(true))
)
);
this.io?.to('nft-event').emit('nft-event', event);
break;
}
case 'nftAssetEvent': {
Expand All @@ -244,13 +214,7 @@ export class SocketIOChannel extends WebSocketChannel {
>;
this.prometheus?.sendEvent('nft-asset-event');
const topic: NftAssetEventTopic = `nft-asset-event:${assetIdentifier}+${value}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit(topic, assetIdentifier, value, event, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, assetIdentifier, value, event);
break;
}
case 'nftCollectionEvent': {
Expand All @@ -259,35 +223,21 @@ export class SocketIOChannel extends WebSocketChannel {
>;
this.prometheus?.sendEvent('nft-collection-event');
const topic: NftCollectionEventTopic = `nft-collection-event:${assetIdentifier}`;
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket =>
socket
.timeout(timeout)
.emit(topic, assetIdentifier, event, _ => socket.disconnect(true))
)
);
this.io?.to(topic).emit(topic, assetIdentifier, event);
break;
}
case 'principalTransaction': {
const [principal, tx] = args as ListenerType<WebSocketPayload['principalTransaction']>;
const topic: AddressTransactionTopic = `address-transaction:${principal}`;
this.prometheus?.sendEvent('address-transaction');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket.timeout(timeout).emit(topic, principal, tx, _ => socket.disconnect(true));
})
);
this.io?.to(topic).emit(topic, principal, tx);
break;
}
case 'principalStxBalance': {
const [principal, balance] = args as ListenerType<WebSocketPayload['principalStxBalance']>;
const topic: AddressStxBalanceTopic = `address-stx-balance:${principal}`;
this.prometheus?.sendEvent('address-stx-balance');
void this.getTopicSockets(topic).then(sockets =>
sockets?.forEach(socket => {
socket.timeout(timeout).emit(topic, principal, balance, _ => socket.disconnect(true));
})
);
this.io?.to(topic).emit(topic, principal, balance);
break;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/tests/socket-io-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ describe('socket-io', () => {
}
});

test('message timeout disconnects client', async () => {
// Per message timeout is not enabled (we don't want to require clients to explicitly reply to events)
test.skip('message timeout disconnects client', async () => {
const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,
Expand Down
Loading