diff --git a/demo/app/components/cursors.ts b/demo/app/components/cursors.ts index 80a3b584..b81a3bcf 100644 --- a/demo/app/components/cursors.ts +++ b/demo/app/components/cursors.ts @@ -50,10 +50,9 @@ const attachCursors = (space, slideId) => { const cursorContainer = queryDataId(slideContainer, 'slide-cursor-container'); cursorContainer.innerHTML = ''; - const cursor = space.cursors.get(slideId); const self = space.getSelf(); - cursor.on('cursorUpdate', (update) => { + space.cursors.on('cursorsUpdate', (update) => { let cursorNode: HTMLElement = slideContainer.querySelector(`#cursor-${update.connectionId}`); const membersOnSlide = space.getMembers().filter((member) => member.location?.slide === slideId); @@ -77,15 +76,15 @@ const attachCursors = (space, slideId) => { const cursorHandlers = { enter: (event) => { const { top, left } = cursorContainer.getBoundingClientRect(); - cursor.set({ position: { x: event.clientX - left, y: event.clientY - top }, data: { state: 'enter' } }); + space.cursors.set({ position: { x: event.clientX - left, y: event.clientY - top }, data: { state: 'enter' } }); }, move: (event) => { const { top, left } = cursorContainer.getBoundingClientRect(); - cursor.set({ position: { x: event.clientX - left, y: event.clientY - top }, data: { state: 'move' } }); + space.cursors.set({ position: { x: event.clientX - left, y: event.clientY - top }, data: { state: 'move' } }); }, leave: (event) => { const { top, left } = cursorContainer.getBoundingClientRect(); - cursor.set({ position: { x: event.clientX - left, y: event.clientY - top }, data: { state: 'leave' } }); + space.cursors.set({ position: { x: event.clientX - left, y: event.clientY - top }, data: { state: 'leave' } }); }, tabLeft: (event) => { if (document.visibilityState === 'hidden') { @@ -100,7 +99,7 @@ const attachCursors = (space, slideId) => { document.addEventListener('visibilitychange', cursorHandlers.tabLeft); return () => { - cursor.off(); + space.cursors.off(); slideContainer.removeEventListener('mouseenter', cursorHandlers.enter); slideContainer.removeEventListener('mousemove', cursorHandlers.move); slideContainer.removeEventListener('mouseleave', cursorHandlers.leave); diff --git a/src/Cursor.ts b/src/Cursor.ts deleted file mode 100644 index 905e2658..00000000 --- a/src/Cursor.ts +++ /dev/null @@ -1,80 +0,0 @@ -import { Types } from 'ably'; - -import EventEmitter from './utilities/EventEmitter.js'; -import { type CursorUpdate } from './Cursors.js'; - -import CursorBatching from './CursorBatching.js'; -import CursorDispensing from './CursorDispensing.js'; -import { CURSOR_UPDATE } from './utilities/Constants.js'; - -type CursorEventMap = { cursorUpdate: CursorUpdate }; - -const emitterHasListeners = (emitter) => { - const flattenEvents = (obj) => - Object.entries(obj) - .map((_, v) => v) - .flat(); - - return ( - emitter.any.length > 0 || - emitter.anyOnce.length > 0 || - flattenEvents(emitter.events).length > 0 || - flattenEvents(emitter.eventsOnce).length > 0 - ); -}; - -export default class Cursor extends EventEmitter { - /** - * @param {string} name - * @param {channel} Types.RealtimeChannelPromise - */ - constructor( - readonly name: string, - private readonly channel: Types.RealtimeChannelPromise, - private cursorBatching: CursorBatching, - private cursorDispensing: CursorDispensing, - ) { - super(); - } - - private isUnsubscribed() { - return !emitterHasListeners(this.channel['subscriptions']); - } - - private subscribe() { - this.channel.subscribe(CURSOR_UPDATE, (message) => { - this.cursorDispensing.processBatch(message); - }); - } - - private unsubscribe() { - this.channel.unsubscribe(); - } - - /** - * Schedules a cursor update event to be sent that will cause the following events to fire - * - * @param {CursorUpdate} cursor - * @return {void} - */ - set(cursor: Pick): void { - this.cursorBatching.pushCursorPosition(this.name, cursor); - } - - on(...args) { - super.on(...args); - - if (this.isUnsubscribed()) { - this.subscribe(); - } - } - - off(...args) { - super.off(...args); - const hasListeners = emitterHasListeners(this); - - if (args.length > 0 || !hasListeners) { - this.unsubscribe(); - } - } -} diff --git a/src/CursorBatching.ts b/src/CursorBatching.ts index d5cf44f1..2aea44cc 100644 --- a/src/CursorBatching.ts +++ b/src/CursorBatching.ts @@ -4,10 +4,10 @@ import { CursorUpdate } from './Cursors.js'; import { CURSOR_UPDATE } from './utilities/Constants.js'; import type { StrictCursorsOptions } from './options/CursorsOptions.js'; -type OutgoingBuffer = Record[]>; +type OutgoingBuffer = Pick[]; export default class CursorBatching { - outgoingBuffers: OutgoingBuffer = {}; + outgoingBuffers: OutgoingBuffer = []; batchTime: number; @@ -27,11 +27,11 @@ export default class CursorBatching { this.batchTime = outboundBatchInterval; } - pushCursorPosition(name: string, cursor: Pick) { + pushCursorPosition(cursor: Pick) { // Ignore the cursor update if there is no one listening if (!this.shouldSend) return; this.hasMovement = true; - this.pushToBuffer(name, cursor); + this.pushToBuffer(cursor); this.publishFromBuffer(CURSOR_UPDATE); } @@ -41,12 +41,8 @@ export default class CursorBatching { this.batchTime = (members.length - 1) * this.outboundBatchInterval; } - private pushToBuffer(key: string, value: Pick) { - if (this.outgoingBuffers[key]) { - this.outgoingBuffers[key].push(value); - } else { - this.outgoingBuffers[key] = [value]; - } + private pushToBuffer(value: Pick) { + this.outgoingBuffers.push(value); } private async publishFromBuffer(eventName: string) { @@ -62,10 +58,10 @@ export default class CursorBatching { return; } // Must be copied here to avoid a race condition where the buffer is cleared before the publish happens - const bufferCopy = { ...this.outgoingBuffers }; + const bufferCopy = [...this.outgoingBuffers]; this.channel.publish(eventName, bufferCopy); setTimeout(() => this.batchToChannel(eventName), this.batchTime); - this.outgoingBuffers = {}; + this.outgoingBuffers = []; this.hasMovement = false; this.isRunning = true; } diff --git a/src/CursorDispensing.ts b/src/CursorDispensing.ts index f1a528a4..a40e5db2 100644 --- a/src/CursorDispensing.ts +++ b/src/CursorDispensing.ts @@ -27,10 +27,6 @@ export default class CursorDispensing { if (!update) continue; this.cursors.emit('cursorsUpdate', update); - - const cursor = this.cursors.cursors[update.name]; - if (!cursor) continue; - cursor.emit('cursorUpdate', update); } if (this.bufferHaveData()) { @@ -67,24 +63,21 @@ export default class CursorDispensing { } processBatch(message: Types.Message) { - Object.keys(message.data).forEach((name) => { - const updates = message.data[name] || []; - - updates.forEach((update) => { - const enhancedMsg = { - name, - clientId: message.clientId, - connectionId: message.connectionId, - position: update.position, - data: update.data, - }; - - if (this.buffer[enhancedMsg.connectionId]) { - this.buffer[enhancedMsg.connectionId].push(enhancedMsg); - } else { - this.buffer[enhancedMsg.connectionId] = [enhancedMsg]; - } - }); + const updates = message.data || []; + + updates.forEach((update) => { + const enhancedMsg = { + clientId: message.clientId, + connectionId: message.connectionId, + position: update.position, + data: update.data, + }; + + if (this.buffer[enhancedMsg.connectionId]) { + this.buffer[enhancedMsg.connectionId].push(enhancedMsg); + } else { + this.buffer[enhancedMsg.connectionId] = [enhancedMsg]; + } }); if (!this.handlerRunning && this.bufferHaveData()) { diff --git a/src/CursorHistory.ts b/src/CursorHistory.ts index 625675cb..f64f9ea2 100644 --- a/src/CursorHistory.ts +++ b/src/CursorHistory.ts @@ -22,11 +22,9 @@ export default class CursorHistory { private messageToUpdate( connectionId: string, clientId: string, - cursorName: string, update: Pick, ): CursorUpdate { return { - name: cursorName, clientId, connectionId, position: update.position, @@ -34,31 +32,6 @@ export default class CursorHistory { }; } - private namedCursorUpdates( - cursorName: CursorName, - connections: ConnectionsLastPosition, - page: Types.PaginatedResult, - ): ConnectionsLastPosition { - return Object.fromEntries( - Object.entries(connections).map(([connectionId, cursors]) => { - if (cursors && cursors[cursorName]) return [connectionId, cursors[cursorName]]; - - const lastMessage = page.items.find((item) => item.connectionId === connectionId); - if (!lastMessage) return [connectionId, null]; - - const { data, clientId }: Types.Message & { data: Record } = lastMessage; - const updates: CursorUpdate[] = data[cursorName] || []; - - if (updates.length > 0) { - const lastUpdate = updates[updates.length - 1]; - return [connectionId, this.messageToUpdate(connectionId, clientId, cursorName, lastUpdate)]; - } else { - return [connectionId, null]; - } - }), - ); - } - private allCursorUpdates( connections: ConnectionsLastPosition, page: Types.PaginatedResult, @@ -68,38 +41,17 @@ export default class CursorHistory { const lastMessage = page.items.find((item) => item.connectionId === connectionId); if (!lastMessage) return [connectionId, cursors]; - const { data, clientId }: { data: Record } & Pick = - lastMessage; + const { data, clientId }: { data: CursorUpdate[] } & Pick = lastMessage; - const updatedCursors = Object.fromEntries( - Object.entries(data).map(([cursorName, updates]) => { - if (cursors && cursors[cursorName]) return [cursorName, cursors[cursorName]]; + const lastUpdate = + data?.length > 0 ? this.messageToUpdate(connectionId, clientId, data[data.length - 1]) : null; - if (updates.length > 0) { - const lastUpdate = updates[updates.length - 1]; - return [cursorName, this.messageToUpdate(connectionId, clientId, cursorName, lastUpdate)]; - } else { - return [cursorName, null]; - } - }), - ); - - return [connectionId, updatedCursors]; + return [connectionId, lastUpdate]; }), ); } - private mapPageToConnections( - page: Types.PaginatedResult, - connections: ConnectionsLastPosition, - cursorName?: string, - ): ConnectionsLastPosition { - return cursorName - ? this.namedCursorUpdates(cursorName, connections, page) - : this.allCursorUpdates(connections, page); - } - - async getLastCursorUpdate(cursorName?: string): Promise { + async getLastCursorUpdate(): Promise { const members = await this.channel.presence.get(); if (members.length === 0) return {}; @@ -115,12 +67,12 @@ export default class CursorHistory { let pageNo = 1; let page = await history.current(); - connections = this.mapPageToConnections(page, connections, cursorName); + connections = this.allCursorUpdates(connections, page); pageNo++; while (pageNo <= this.paginationLimit && this.positionsMissing(connections) && history.hasNext()) { page = await history.next(); - connections = this.mapPageToConnections(page, connections, cursorName); + connections = this.allCursorUpdates(connections, page); pageNo++; } diff --git a/src/Cursors.mockClient.test.ts b/src/Cursors.mockClient.test.ts index 43235c51..e44b0329 100644 --- a/src/Cursors.mockClient.test.ts +++ b/src/Cursors.mockClient.test.ts @@ -1,9 +1,8 @@ -import { it, describe, expect, vi, expectTypeOf, beforeEach, vitest, afterEach } from 'vitest'; +import { it, describe, expect, vi, beforeEach, vitest, afterEach } from 'vitest'; import { Realtime, Types } from 'ably/promises'; import Space from './Space.js'; import { createPresenceMessage } from './utilities/test/fakes.js'; -import Cursor from './Cursor.js'; import CursorBatching from './CursorBatching.js'; import { CURSOR_UPDATE } from './utilities/Constants.js'; import CursorDispensing from './CursorDispensing.js'; @@ -49,24 +48,10 @@ describe('Cursors (mockClient)', () => { vi.useRealTimers(); }); - it('creates a cursor and returns it', ({ space }) => { - expectTypeOf(space.cursors.get('cursor1')).toMatchTypeOf(); - }); - - it('returns an existing cursor', ({ space }) => { - const cursor1 = space.cursors.get('cursor1'); - const cursor2 = space.cursors.get('cursor2'); - expect(cursor1).not.toEqual(cursor2); - expect(space.cursors.get('cursor1')).toEqual(cursor1); - }); - it('emits a cursorsUpdate event', ({ space, dispensing, batching, fakeMessageStub }) => { const fakeMessage = { ...fakeMessageStub, - data: { - cursor1: [{ position: { x: 1, y: 1 } }], - cursor2: [{ position: { x: 1, y: 2 }, data: { color: 'red' } }], - }, + data: [{ position: { x: 1, y: 1 } }, { position: { x: 1, y: 2 }, data: { color: 'red' } }], }; const spy = vitest.fn(); @@ -80,7 +65,6 @@ describe('Cursors (mockClient)', () => { data: undefined, clientId: 'clientId', connectionId: 'connectionId', - name: 'cursor1', }); vi.advanceTimersByTime(batching.batchTime * 2); @@ -90,43 +74,8 @@ describe('Cursors (mockClient)', () => { data: { color: 'red' }, clientId: 'clientId', connectionId: 'connectionId', - name: 'cursor2', }); }); - - it('emits cursorUpdate for a specific cursor event', ({ - space, - dispensing, - batching, - fakeMessageStub, - }) => { - const fakeMessage = { - ...fakeMessageStub, - data: { - cursor1: [{ position: { x: 1, y: 1 } }], - cursor2: [{ position: { x: 1, y: 2 }, data: { color: 'red' } }], - }, - }; - - const spy = vitest.fn(); - const catchAllSpy = vitest.fn(); - space.cursors.on(catchAllSpy); - space.cursors.get('cursor1').on(spy); - dispensing.processBatch(fakeMessage); - - vi.advanceTimersByTime(batching.batchTime * 2); - - const result = { - position: { x: 1, y: 1 }, - data: undefined, - clientId: 'clientId', - connectionId: 'connectionId', - name: 'cursor1', - }; - - expect(spy).toHaveBeenCalledWith(result); - expect(catchAllSpy).toHaveBeenCalledWith(result); - }); }); describe('CursorBatching', () => { @@ -176,18 +125,16 @@ describe('Cursors (mockClient)', () => { expect(batching.hasMovement).toBeTruthy(); }); - it('creates an outgoingBuffer for a new cursor movement', (context) => { - const batching = context.batching as any; - batching.pushCursorPosition('cursor1', { position: { x: 1, y: 1 }, data: {} }); - expect(batching.outgoingBuffers.cursor1).toEqual([{ position: { x: 1, y: 1 }, data: {} }]); + it('creates an outgoingBuffer for a new cursor movement', ({ batching }) => { + batching.pushCursorPosition({ position: { x: 1, y: 1 }, data: {} }); + expect(batching.outgoingBuffers).toEqual([{ position: { x: 1, y: 1 }, data: {} }]); }); - it('adds cursor data to an existing buffer', (context) => { - const batching = context.batching as any; - batching.pushCursorPosition('cursor1', { position: { x: 1, y: 1 }, data: {} }); - expect(batching.outgoingBuffers.cursor1).toEqual([{ position: { x: 1, y: 1 }, data: {} }]); - batching.pushCursorPosition('cursor1', { position: { x: 2, y: 2 }, data: {} }); - expect(batching.outgoingBuffers.cursor1).toEqual([ + it('adds cursor data to an existing buffer', ({ batching }) => { + batching.pushCursorPosition({ position: { x: 1, y: 1 }, data: {} }); + expect(batching.outgoingBuffers).toEqual([{ position: { x: 1, y: 1 }, data: {} }]); + batching.pushCursorPosition({ position: { x: 2, y: 2 }, data: {} }); + expect(batching.outgoingBuffers).toEqual([ { position: { x: 1, y: 1 }, data: {} }, { position: { x: 2, y: 2 }, data: {} }, ]); @@ -224,22 +171,19 @@ describe('Cursors (mockClient)', () => { expect(spy).not.toHaveBeenCalled(); }); - it('should publish the cursor buffer', async (context) => { - const space = context.space; - const batching = context.batching as any; + it('should publish the cursor buffer', async ({ space, batching }) => { batching.hasMovement = true; - batching.outgoingBuffers = { cursor1: [{ position: { x: 1, y: 1 }, data: {} }] }; + batching.outgoingBuffers = [{ position: { x: 1, y: 1 }, data: {} }]; const spy = vi.spyOn(space.cursors['channel'], 'publish'); - await batching.batchToChannel(CURSOR_UPDATE); - expect(spy).toHaveBeenCalledWith(CURSOR_UPDATE, { cursor1: [{ position: { x: 1, y: 1 }, data: {} }] }); + await batching['batchToChannel'](CURSOR_UPDATE); + expect(spy).toHaveBeenCalledWith(CURSOR_UPDATE, [{ position: { x: 1, y: 1 }, data: {} }]); }); - it('should clear the buffer', async (context) => { - const batching = context.batching as any; + it('should clear the buffer', async ({ batching }) => { batching.hasMovement = true; - batching.outgoingBuffers = { cursor1: [{ position: { x: 1, y: 1 }, data: {} }] }; - await batching.batchToChannel(CURSOR_UPDATE); - expect(batching.outgoingBuffers).toEqual({}); + batching.outgoingBuffers = [{ position: { x: 1, y: 1 }, data: {} }]; + await batching['batchToChannel'](CURSOR_UPDATE); + expect(batching.outgoingBuffers).toEqual([]); }); it('should set hasMovements to false', async (context) => { @@ -261,9 +205,7 @@ describe('Cursors (mockClient)', () => { const fakeMessage = { ...fakeMessageStub, - data: { - cursor1: [], - }, + data: [], }; dispensing.processBatch(fakeMessage); @@ -278,9 +220,7 @@ describe('Cursors (mockClient)', () => { const fakeMessage = { ...fakeMessageStub, - data: { - cursor1: [{ position: { x: 1, y: 1 } }], - }, + data: [{ position: { x: 1, y: 1 } }], }; dispensing['handlerRunning'] = true; @@ -293,9 +233,7 @@ describe('Cursors (mockClient)', () => { const fakeMessage = { ...fakeMessageStub, - data: { - cursor1: [{ position: { x: 1, y: 1 } }], - }, + data: [{ position: { x: 1, y: 1 } }], }; dispensing.processBatch(fakeMessage); @@ -308,10 +246,11 @@ describe('Cursors (mockClient)', () => { }) => { const fakeMessage = { ...fakeMessageStub, - data: { - cursor1: [{ position: { x: 1, y: 1 } }, { position: { x: 2, y: 3 }, data: { color: 'blue' } }], - cursor2: [{ position: { x: 5, y: 4 } }], - }, + data: [ + { position: { x: 1, y: 1 } }, + { position: { x: 2, y: 3 }, data: { color: 'blue' } }, + { position: { x: 5, y: 4 } }, + ], }; dispensing.processBatch(fakeMessage); @@ -322,21 +261,18 @@ describe('Cursors (mockClient)', () => { data: undefined, clientId: 'clientId', connectionId: 'connectionId', - name: 'cursor1', }, { position: { x: 2, y: 3 }, data: { color: 'blue' }, clientId: 'clientId', connectionId: 'connectionId', - name: 'cursor1', }, { position: { x: 5, y: 4 }, data: undefined, clientId: 'clientId', connectionId: 'connectionId', - name: 'cursor2', }, ], }); @@ -347,10 +283,11 @@ describe('Cursors (mockClient)', () => { const fakeMessage = { ...fakeMessageStub, - data: { - cursor1: [{ position: { x: 1, y: 1 } }, { position: { x: 2, y: 3 }, data: { color: 'blue' } }], - cursor2: [{ position: { x: 5, y: 4 } }], - }, + data: [ + { position: { x: 1, y: 1 } }, + { position: { x: 2, y: 3 }, data: { color: 'blue' } }, + { position: { x: 5, y: 4 } }, + ], }; expect(dispensing['handlerRunning']).toBe(false); @@ -390,25 +327,17 @@ describe('Cursors (mockClient)', () => { expect(await space.cursors.getAll()).toEqual({}); }); - it('gets the last position of all cursors from all connected clients', async ({ - space, - history, - }) => { + it('gets the last position from all connected clients', async ({ space, history }) => { const client1Message = { connectionId: 'connectionId1', clientId: 'clientId1', - data: { - cursor1: [{ position: { x: 1, y: 1 } }, { position: { x: 2, y: 3 }, data: { color: 'blue' } }], - cursor2: [{ position: { x: 5, y: 4 } }], - }, + data: [{ position: { x: 1, y: 1 } }, { position: { x: 2, y: 3 }, data: { color: 'blue' } }], }; const client2Message = { connectionId: 'connectionId2', clientId: 'clientId2', - data: { - cursor2: [{ position: { x: 25, y: 44 } }], - }, + data: [{ position: { x: 25, y: 44 } }], }; vi.spyOn(history['channel']['presence'], 'get').mockImplementation(async () => [ @@ -426,113 +355,24 @@ describe('Cursors (mockClient)', () => { expect(await space.cursors.getAll()).toEqual({ connectionId1: { - cursor1: { - connectionId: 'connectionId1', - clientId: 'clientId1', - data: { - color: 'blue', - }, - name: 'cursor1', - position: { - x: 2, - y: 3, - }, - }, - cursor2: { - connectionId: 'connectionId1', - clientId: 'clientId1', - data: undefined, - name: 'cursor2', - position: { - x: 5, - y: 4, - }, + connectionId: 'connectionId1', + clientId: 'clientId1', + data: { + color: 'blue', }, - }, - connectionId2: { - cursor2: { - connectionId: 'connectionId2', - clientId: 'clientId2', - data: undefined, - name: 'cursor2', - position: { - x: 25, - y: 44, - }, + position: { + x: 2, + y: 3, }, }, - }); - }); - - it('gets the last position of a given cursor from all connected clients', async ({ - space, - history, - }) => { - const client1Message = { - connectionId: 'connectionId1', - clientId: 'clientId1', - data: { - cursor1: [{ position: { x: 1, y: 1 } }, { position: { x: 2, y: 3 }, data: { color: 'blue' } }], - cursor2: [{ position: { x: 5, y: 4 } }], - }, - }; - - const client2Message = { - connectionId: 'connectionId2', - clientId: 'clientId2', - data: { - cursor2: [{ position: { x: 25, y: 44 } }], - }, - }; - - vi.spyOn(history['channel']['presence'], 'get').mockImplementation(async () => [ - createPresenceMessage('enter', { connectionId: 'connectionId1' }), - createPresenceMessage('enter', { connectionId: 'connectionId2' }), - ]); - - const page = await history['channel'].history(); - vi.spyOn(page, 'current').mockImplementationOnce(async () => { - return { - ...history['channel']['history'](), - items: [client1Message, client2Message], - }; - }); - expect(await space.cursors.getAll()).toEqual({ - connectionId1: { - cursor1: { - connectionId: 'connectionId1', - clientId: 'clientId1', - data: { - color: 'blue', - }, - name: 'cursor1', - position: { - x: 2, - y: 3, - }, - }, - cursor2: { - connectionId: 'connectionId1', - clientId: 'clientId1', - data: undefined, - name: 'cursor2', - position: { - x: 5, - y: 4, - }, - }, - }, connectionId2: { - cursor2: { - connectionId: 'connectionId2', - clientId: 'clientId2', - data: undefined, - name: 'cursor2', - position: { - x: 25, - y: 44, - }, + connectionId: 'connectionId2', + clientId: 'clientId2', + data: undefined, + position: { + x: 25, + y: 44, }, }, }); diff --git a/src/Cursors.ts b/src/Cursors.ts index 4cefae89..7c2d1d0d 100644 --- a/src/Cursors.ts +++ b/src/Cursors.ts @@ -1,12 +1,13 @@ import { Types } from 'ably'; import Space from './Space.js'; -import Cursor from './Cursor.js'; import CursorBatching from './CursorBatching.js'; import CursorDispensing from './CursorDispensing.js'; import { OUTGOING_BATCH_TIME_DEFAULT, PAGINATION_LIMIT_DEFAULT } from './utilities/Constants.js'; import EventEmitter from './utilities/EventEmitter.js'; import CursorHistory from './CursorHistory.js'; +import { CURSOR_UPDATE } from './utilities/Constants.js'; + import type { CursorsOptions, StrictCursorsOptions } from './options/CursorsOptions.js'; type CursorPosition = { x: number; y: number }; @@ -14,7 +15,6 @@ type CursorPosition = { x: number; y: number }; type CursorData = Record; type CursorUpdate = { - name: string; clientId: string; connectionId: string; position: CursorPosition; @@ -25,6 +25,20 @@ type CursorsEventMap = { cursorsUpdate: Record; }; +const emitterHasListeners = (emitter) => { + const flattenEvents = (obj) => + Object.entries(obj) + .map((_, v) => v) + .flat(); + + return ( + emitter.any.length > 0 || + emitter.anyOnce.length > 0 || + flattenEvents(emitter.events).length > 0 || + flattenEvents(emitter.eventsOnce).length > 0 + ); +}; + export default class Cursors extends EventEmitter { private readonly cursorBatching: CursorBatching; private readonly cursorDispensing: CursorDispensing; @@ -32,8 +46,6 @@ export default class Cursors extends EventEmitter { private channel: Types.RealtimeChannelPromise; readonly options: StrictCursorsOptions; - cursors: Record = {}; - constructor(private space: Space, options: CursorsOptions = {}) { super(); this.options = { @@ -44,23 +56,63 @@ export default class Cursors extends EventEmitter { for (const option in options) { if (options[option]) this.options[option] = options[option]; } + const spaceChannelName = space.getChannelName(); + this.channel = space.client.channels.get(`${spaceChannelName}_cursors`); this.cursorBatching = new CursorBatching(this.channel, this.options.outboundBatchInterval); this.cursorHistory = new CursorHistory(this.channel, this.options.paginationLimit); this.cursorDispensing = new CursorDispensing(this, this.cursorBatching); } - get(name: string): Cursor { - if (!this.cursors[name]) { - this.cursors[name] = new Cursor(name, this.channel, this.cursorBatching, this.cursorDispensing); + /** + * Schedules a cursor update event to be sent that will cause the following events to fire + * + * @param {CursorUpdate} cursor + * @return {void} + */ + set(cursor: Pick): void { + const self = this.space.getSelf(); + if (!self) { + throw new Error('Must enter a space before setting a cursor update'); } - return this.cursors[name]; + this.cursorBatching.pushCursorPosition(cursor); + } + + private isUnsubscribed() { + return !emitterHasListeners(this.channel['subscriptions']); + } + + private subscribe() { + this.channel.subscribe(CURSOR_UPDATE, (message) => { + this.cursorDispensing.processBatch(message); + }); + } + + private unsubscribe() { + this.channel.unsubscribe(); + } + + on(...args) { + super.on(...args); + + if (this.isUnsubscribed()) { + this.subscribe(); + } + } + + off(...args) { + super.off(...args); + const hasListeners = emitterHasListeners(this); + + if (args.length > 0 || !hasListeners) { + this.unsubscribe(); + } } - async getAll(cursorName?: string) { - return await this.cursorHistory.getLastCursorUpdate(cursorName); + async getAll() { + return await this.cursorHistory.getLastCursorUpdate(); } }