From 04e8ff36b38d39839391286c7862e548dc041af6 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 16 Jul 2024 19:20:59 +0100 Subject: [PATCH 1/7] feat: split large incremental snapshots --- .../replay/sessionrecording-utils.test.ts | 276 +++++++++++++----- .../replay/sessionrecording-utils.ts | 146 +++++++-- 2 files changed, 325 insertions(+), 97 deletions(-) diff --git a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts index 8e27f55b4..dbcdf4b76 100644 --- a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts +++ b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts @@ -1,16 +1,20 @@ import { - ensureMaxMessageSize, - replacementImageURI, - truncateLargeConsoleLogs, CONSOLE_LOG_PLUGIN_NAME, - PLUGIN_EVENT_TYPE, + ensureMaxMessageSize, + estimateSize, FULL_SNAPSHOT_EVENT_TYPE, - splitBuffer, + PLUGIN_EVENT_TYPE, + replacementImageURI, SEVEN_MEGABYTES, - estimateSize, + splitBuffer, + truncateLargeConsoleLogs, } from '../../../extensions/replay/sessionrecording-utils' import { largeString, threeMBAudioURI, threeMBImageURI } from '../test_data/sessionrecording-utils-test-data' -import { eventWithTime } from '@rrweb/types' +import { eventWithTime, incrementalSnapshotEvent, IncrementalSource } from '@rrweb/types' +import { serializedNodeWithId } from 'rrweb-snapshot' + +const ONE_MEGABYTE = 1024 * 1024 +const ONE_MEGABYTE_OF_DATA = 'a'.repeat(1024 * 1024) describe(`SessionRecording utility functions`, () => { describe(`filterLargeDataURLs`, () => { @@ -244,93 +248,209 @@ describe(`SessionRecording utility functions`, () => { }) describe('splitBuffer', () => { - it('should return the same buffer if size is less than SEVEN_MEGABYTES', () => { - const buffer = { - size: 5 * 1024 * 1024, - data: new Array(100).fill(0), - sessionId: 'session1', - windowId: 'window1', - } + describe('when many items in the buffer', () => { + it('should return the same buffer if size is less than SEVEN_MEGABYTES', () => { + const buffer = { + size: 5 * 1024 * 1024, + data: new Array(100).fill(0), + sessionId: 'session1', + windowId: 'window1', + } - const result = splitBuffer(buffer) - expect(result).toEqual([buffer]) - }) + const result = splitBuffer(buffer) + expect(result).toEqual([buffer]) + }) - it('should split the buffer into two halves if size is greater than or equal to SEVEN_MEGABYTES', () => { - const data = new Array(100).fill(0) - const expectedSize = estimateSize(new Array(50).fill(0)) - const buffer = { - size: estimateSize(data), - data: data, - sessionId: 'session1', - windowId: 'window1', - } + it('should split the buffer into two halves if size is greater than or equal to SEVEN_MEGABYTES', () => { + const data = new Array(100).fill(0) + const expectedSize = estimateSize(new Array(50).fill(0)) + const buffer = { + size: estimateSize(data), + data: data, + sessionId: 'session1', + windowId: 'window1', + } - // size limit just below the size of the buffer - const result = splitBuffer(buffer, 200) + // size limit just below the size of the buffer + const result = splitBuffer(buffer, 200) - expect(result).toHaveLength(2) - expect(result[0].data).toEqual(buffer.data.slice(0, 50)) - expect(result[0].size).toEqual(expectedSize) - expect(result[1].data).toEqual(buffer.data.slice(50)) - expect(result[1].size).toEqual(expectedSize) - }) + expect(result).toHaveLength(2) + expect(result[0].data).toEqual(buffer.data.slice(0, 50)) + expect(result[0].size).toEqual(expectedSize) + expect(result[1].data).toEqual(buffer.data.slice(50)) + expect(result[1].size).toEqual(expectedSize) + }) + + it('should recursively split the buffer until each part is smaller than SEVEN_MEGABYTES', () => { + const largeDataArray = new Array(100).fill('a'.repeat(1024 * 1024)) + const largeDataSize = estimateSize(largeDataArray) // >100mb + const buffer = { + size: largeDataSize, + data: largeDataArray, + sessionId: 'session1', + windowId: 'window1', + } - it('should recursively split the buffer until each part is smaller than SEVEN_MEGABYTES', () => { - const largeDataArray = new Array(100).fill('a'.repeat(1024 * 1024)) - const largeDataSize = estimateSize(largeDataArray) // >100mb - const buffer = { - size: largeDataSize, - data: largeDataArray, - sessionId: 'session1', - windowId: 'window1', - } + const result = splitBuffer(buffer) - const result = splitBuffer(buffer) + expect(result.length).toBe(15) + let partTotal = 0 + let sentArray: any[] = [] + result.forEach((part) => { + expect(part.size).toBeLessThan(SEVEN_MEGABYTES * 1.2) + sentArray = sentArray.concat(part.data) + partTotal += part.size + }) - expect(result.length).toBe(20) - let partTotal = 0 - let sentArray: any[] = [] - result.forEach((part) => { - expect(part.size).toBeLessThan(SEVEN_MEGABYTES) - sentArray = sentArray.concat(part.data) - partTotal += part.size + // it's a bit bigger because we have extra square brackets and commas when stringified + expect(partTotal).toBeGreaterThan(largeDataSize) + // but not much bigger! + expect(partTotal).toBeLessThan(largeDataSize * 1.001) + // we sent the same data overall + expect(JSON.stringify(sentArray)).toEqual(JSON.stringify(largeDataArray)) }) - // it's a bit bigger because we have extra square brackets and commas when stringified - expect(partTotal).toBeGreaterThan(largeDataSize) - // but not much bigger! - expect(partTotal).toBeLessThan(largeDataSize * 1.001) - // we sent the same data overall - expect(JSON.stringify(sentArray)).toEqual(JSON.stringify(largeDataArray)) - }) + it('should handle buffer with size exactly SEVEN_MEGABYTES', () => { + const buffer = { + size: SEVEN_MEGABYTES, + data: new Array(100).fill(0), + sessionId: 'session1', + windowId: 'window1', + } + + const result = splitBuffer(buffer, 101) + + expect(result).toHaveLength(2) + expect(result[0].data).toEqual(buffer.data.slice(0, 50)) + expect(result[1].data).toEqual(buffer.data.slice(50)) + }) - it('should handle buffer with size exactly SEVEN_MEGABYTES', () => { - const buffer = { - size: SEVEN_MEGABYTES, - data: new Array(100).fill(0), - sessionId: 'session1', - windowId: 'window1', - } + it('should not split buffer if it has only one element', () => { + const buffer = { + size: 10 * 1024 * 1024, + data: [0], + sessionId: 'session1', + windowId: 'window1', + } - const result = splitBuffer(buffer) + const result = splitBuffer(buffer) - expect(result).toHaveLength(2) - expect(result[0].data).toEqual(buffer.data.slice(0, 50)) - expect(result[1].data).toEqual(buffer.data.slice(50)) + expect(result).toEqual([buffer]) + }) }) - it('should not split buffer if it has only one element', () => { - const buffer = { - size: 10 * 1024 * 1024, - data: [0], - sessionId: 'session1', - windowId: 'window1', - } + describe('when one item in the buffer', () => { + it('should ignore full snapshots (for now)', () => { + const buffer = { + size: 5 * 1024 * 1024, + data: [{ type: '2' }], + sessionId: 'session1', + windowId: 'window1', + } - const result = splitBuffer(buffer) + const result = splitBuffer(buffer) + expect(result).toEqual([buffer]) + }) - expect(result).toEqual([buffer]) + it('should split incremental adds', () => { + const incrementalSnapshot: incrementalSnapshotEvent = { + type: 3, + data: { + source: IncrementalSource.Mutation, + adds: [ + { + parentId: 1, + nextId: null, + node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, + }, + { + parentId: 2, + nextId: null, + node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, + }, + ], + texts: [], + attributes: [], + // removes are processed first by the replayer, so we need to be sure we're emitting them first + removes: [{ parentId: 1, id: 2 }], + }, + } + const buffer = { + size: 5 * 1024 * 1024, + data: [incrementalSnapshot], + sessionId: 'session1', + windowId: 'window1', + } + + const result = splitBuffer(buffer, ONE_MEGABYTE * 0.9) + expect(result).toHaveLength(3) + expect(result[0]).toEqual({ + ...buffer, + size: 23, + data: [ + { + type: 3, + data: { + // removes are processed first by the replayer, so we need to be sure we're emitting them first + removes: [{ parentId: 1, id: 2 }], + adds: [], + texts: [], + attributes: [], + source: 0, + }, + } as incrementalSnapshotEvent, + ], + }) + expect(result[1]).toEqual( + // the two adds each only fit one at a time, so they are split in order + // TODO if we sort these by timestamp at playback what's going to happen... + // we need to maintain the original order + { + ...buffer, + size: 1048616, + data: [ + { + type: 3, + data: { + source: 0, + texts: [], + attributes: [], + removes: [], + adds: [ + { + parentId: 1, + nextId: null, + node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, + }, + ], + }, + }, + ], + } + ) + expect(result[2]).toEqual({ + ...buffer, + size: 1048616, + data: [ + { + type: 3, + data: { + source: 0, + texts: [], + attributes: [], + removes: [], + adds: [ + { + parentId: 2, + nextId: null, + node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, + }, + ], + }, + }, + ], + }) + }) }) }) }) diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index 91eff8d04..6651570a7 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -1,10 +1,11 @@ -import type { +import { blockClass, eventWithTime, hooksParam, KeepIframeSrcFn, listenerHandler, maskTextClass, + mutationData, pluginEvent, RecordPlugin, SamplingStrategy, @@ -165,27 +166,134 @@ export function truncateLargeConsoleLogs(_event: eventWithTime) { export const SEVEN_MEGABYTES = 1024 * 1024 * 7 * 0.9 // ~7mb (with some wiggle room) -// recursively splits large buffers into smaller ones +function sliceList(list: any[], sizeLimit: number): any[][] { + const size = estimateSize(list) + if (size < sizeLimit) { + return [list] + } else { + const times = Math.ceil(size / sizeLimit) + const chunkLength = Math.ceil(list.length / times) + const chunks = [] + for (let i = 0; i < list.length; i += chunkLength) { + chunks.push(list.slice(i, i + chunkLength)) + } + return chunks + } +} + +function sliceBuffer(buffer: SnapshotBuffer, sizeLimit: number): SnapshotBuffer[] { + const bufferChunks = sliceList(buffer.data, sizeLimit) + return bufferChunks.map((data) => ({ + size: estimateSize(data), + data, + sessionId: buffer.sessionId, + windowId: buffer.windowId, + })) +} + +function hasIncrementalContent(buffer: SnapshotBuffer): boolean { + return buffer.data.some( + (item) => + item.data?.adds?.length || + item.data?.removes?.length || + item.data?.texts?.length || + item.data?.attributes?.length + ) +} + // uses a pretty high size limit to avoid splitting too much export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_MEGABYTES): SnapshotBuffer[] { if (buffer.size >= sizeLimit && buffer.data.length > 1) { - const half = Math.floor(buffer.data.length / 2) - const firstHalf = buffer.data.slice(0, half) - const secondHalf = buffer.data.slice(half) - return [ - splitBuffer({ - size: estimateSize(firstHalf), - data: firstHalf, - sessionId: buffer.sessionId, - windowId: buffer.windowId, - }), - splitBuffer({ - size: estimateSize(secondHalf), - data: secondHalf, - sessionId: buffer.sessionId, - windowId: buffer.windowId, - }), - ].flatMap((x) => x) + return sliceBuffer(buffer, sizeLimit) + } else if (buffer.size >= sizeLimit && buffer.data.length === 1) { + // we can maybe split up incremental snapshots, or directly edit data image urls here + const bufferedData = buffer.data[0] + if ( + bufferedData.type === INCREMENTAL_SNAPSHOT_EVENT_TYPE && + bufferedData.data.source === MUTATION_SOURCE_TYPE + ) { + // so at this point we know that the buffer is too large, and we have a single incremental snapshot + // it may be that a single item in the buffer is too large + // or that there are many small items of one or more types that end up being too large + // rrweb processes removes, then adds, then texts, the attributes, + // so we can split them in that order + const bufferedMutations = bufferedData.data as mutationData + const removes = sliceList(bufferedMutations.removes, sizeLimit) + const adds = sliceList(bufferedMutations.adds, sizeLimit) + const texts = sliceList(bufferedMutations.texts, sizeLimit) + const attributes = sliceList(bufferedMutations.attributes, sizeLimit) + return [ + ...removes.map((remove) => ({ + size: estimateSize(remove), + data: [ + { + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: [], + texts: [], + attributes: [], + removes: remove, + }, + }, + ], + sessionId: buffer.sessionId, + windowId: buffer.windowId, + })), + ...adds.map((add) => ({ + size: estimateSize(add), + data: [ + { + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: add, + texts: [], + attributes: [], + removes: [], + }, + }, + ], + sessionId: buffer.sessionId, + windowId: buffer.windowId, + })), + ...texts.map((text) => ({ + size: estimateSize(text), + data: [ + { + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: [], + texts: text, + attributes: [], + removes: [], + }, + }, + ], + sessionId: buffer.sessionId, + windowId: buffer.windowId, + })), + ...attributes.map((attribute) => ({ + size: estimateSize(attribute), + data: [ + { + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: [], + texts: [], + attributes: attribute, + removes: [], + }, + }, + ], + sessionId: buffer.sessionId, + windowId: buffer.windowId, + })), + ].filter(hasIncrementalContent) + } + return [buffer] } else { return [buffer] } From 19b75f9ae082ee8dc0c74db18721832afd7f767d Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 16 Jul 2024 20:48:52 +0100 Subject: [PATCH 2/7] account for timestamp --- .../replay/sessionrecording-utils.test.ts | 15 +++++--- .../replay/sessionrecording-utils.ts | 36 ++++++++++++++----- src/extensions/replay/sessionrecording.ts | 2 +- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts index dbcdf4b76..53c822fc4 100644 --- a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts +++ b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts @@ -249,15 +249,16 @@ describe(`SessionRecording utility functions`, () => { describe('splitBuffer', () => { describe('when many items in the buffer', () => { - it('should return the same buffer if size is less than SEVEN_MEGABYTES', () => { + it('should return the same buffer if size is less than the limit', () => { + const theData = new Array(100).fill(0) const buffer = { - size: 5 * 1024 * 1024, - data: new Array(100).fill(0), + size: estimateSize(theData), + data: theData, sessionId: 'session1', windowId: 'window1', } - const result = splitBuffer(buffer) + const result = splitBuffer(buffer, estimateSize(theData) + 1) expect(result).toEqual([buffer]) }) @@ -353,8 +354,9 @@ describe(`SessionRecording utility functions`, () => { }) it('should split incremental adds', () => { - const incrementalSnapshot: incrementalSnapshotEvent = { + const incrementalSnapshot: eventWithTime = { type: 3, + timestamp: 12345, data: { source: IncrementalSource.Mutation, adds: [ @@ -389,6 +391,7 @@ describe(`SessionRecording utility functions`, () => { size: 23, data: [ { + timestamp: 12343, type: 3, data: { // removes are processed first by the replayer, so we need to be sure we're emitting them first @@ -410,6 +413,7 @@ describe(`SessionRecording utility functions`, () => { size: 1048616, data: [ { + timestamp: 12344, type: 3, data: { source: 0, @@ -433,6 +437,7 @@ describe(`SessionRecording utility functions`, () => { size: 1048616, data: [ { + timestamp: 12345, type: 3, data: { source: 0, diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index 6651570a7..728ef21df 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -12,7 +12,7 @@ import { } from '@rrweb/types' import type { DataURLOptions, MaskInputFn, MaskInputOptions, MaskTextFn, Mirror, SlimDOMOptions } from 'rrweb-snapshot' -import { isObject } from '../../utils/type-utils' +import { isNullish, isObject } from '../../utils/type-utils' import { SnapshotBuffer } from './sessionrecording' // taken from https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Cyclic_object_value#circular_references @@ -192,13 +192,20 @@ function sliceBuffer(buffer: SnapshotBuffer, sizeLimit: number): SnapshotBuffer[ } function hasIncrementalContent(buffer: SnapshotBuffer): boolean { - return buffer.data.some( - (item) => - item.data?.adds?.length || - item.data?.removes?.length || - item.data?.texts?.length || - item.data?.attributes?.length - ) + return buffer.data.some((item) => { + const mutationData = item.data as mutationData + return ( + !isNullish(mutationData) && + (mutationData?.adds?.length || + mutationData?.removes?.length || + mutationData?.texts?.length || + mutationData?.attributes?.length) + ) + }) +} + +function countChildren(xs: any[][]): number { + return xs.reduce((acc, x) => acc + x.length, 0) } // uses a pretty high size limit to avoid splitting too much @@ -222,6 +229,15 @@ export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_ME const adds = sliceList(bufferedMutations.adds, sizeLimit) const texts = sliceList(bufferedMutations.texts, sizeLimit) const attributes = sliceList(bufferedMutations.attributes, sizeLimit) + + // the incoming data has a single timestamp, so we need to adjust the timestamps of the split data, + // so we count how many children we have in total, and then we adjust the timestamp + // so that if there are 10 the first item is 9 milliseconds before the original timestamp + // and the final item has the original timestamp + const alteration = + countChildren(removes) + countChildren(adds) + countChildren(texts) + countChildren(attributes) + let timestampWiggleMarker = 1 + return [ ...removes.map((remove) => ({ size: estimateSize(remove), @@ -235,6 +251,7 @@ export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_ME attributes: [], removes: remove, }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, }, ], sessionId: buffer.sessionId, @@ -252,6 +269,7 @@ export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_ME attributes: [], removes: [], }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, }, ], sessionId: buffer.sessionId, @@ -269,6 +287,7 @@ export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_ME attributes: [], removes: [], }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, }, ], sessionId: buffer.sessionId, @@ -286,6 +305,7 @@ export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_ME attributes: attribute, removes: [], }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, }, ], sessionId: buffer.sessionId, diff --git a/src/extensions/replay/sessionrecording.ts b/src/extensions/replay/sessionrecording.ts index 68a8dbbb0..5ef0b5416 100644 --- a/src/extensions/replay/sessionrecording.ts +++ b/src/extensions/replay/sessionrecording.ts @@ -73,7 +73,7 @@ type SessionRecordingStatus = 'disabled' | 'sampled' | 'active' | 'buffering' export interface SnapshotBuffer { size: number - data: any[] + data: eventWithTime[] sessionId: string windowId: string } From 35b1d85615de12f4dd53cad0d94d7b2e0212b62a Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sat, 20 Jul 2024 11:14:11 +0100 Subject: [PATCH 3/7] after testing locally with every incremental being processed --- .../replay/sessionrecording-utils.test.ts | 133 +++++------ .../replay/sessionrecording-utils.ts | 206 ++++++++---------- 2 files changed, 161 insertions(+), 178 deletions(-) diff --git a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts index 53c822fc4..74ed66772 100644 --- a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts +++ b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts @@ -12,6 +12,7 @@ import { import { largeString, threeMBAudioURI, threeMBImageURI } from '../test_data/sessionrecording-utils-test-data' import { eventWithTime, incrementalSnapshotEvent, IncrementalSource } from '@rrweb/types' import { serializedNodeWithId } from 'rrweb-snapshot' +import { SnapshotBuffer } from '../../../extensions/replay/sessionrecording' const ONE_MEGABYTE = 1024 * 1024 const ONE_MEGABYTE_OF_DATA = 'a'.repeat(1024 * 1024) @@ -327,14 +328,14 @@ describe(`SessionRecording utility functions`, () => { }) it('should not split buffer if it has only one element', () => { - const buffer = { - size: 10 * 1024 * 1024, - data: [0], + const buffer: SnapshotBuffer = { + size: estimateSize([0]), + data: [0 as unknown as eventWithTime], sessionId: 'session1', windowId: 'window1', } - const result = splitBuffer(buffer) + const result = splitBuffer(buffer, estimateSize([0]) - 1) expect(result).toEqual([buffer]) }) @@ -342,14 +343,14 @@ describe(`SessionRecording utility functions`, () => { describe('when one item in the buffer', () => { it('should ignore full snapshots (for now)', () => { - const buffer = { - size: 5 * 1024 * 1024, - data: [{ type: '2' }], + const buffer: SnapshotBuffer = { + size: 14, + data: [{ type: '2' } as unknown as eventWithTime], sessionId: 'session1', windowId: 'window1', } - const result = splitBuffer(buffer) + const result = splitBuffer(buffer, 12) expect(result).toEqual([buffer]) }) @@ -377,8 +378,9 @@ describe(`SessionRecording utility functions`, () => { removes: [{ parentId: 1, id: 2 }], }, } + const expectedSize = estimateSize([incrementalSnapshot]) const buffer = { - size: 5 * 1024 * 1024, + size: expectedSize, data: [incrementalSnapshot], sessionId: 'session1', windowId: 'window1', @@ -386,74 +388,77 @@ describe(`SessionRecording utility functions`, () => { const result = splitBuffer(buffer, ONE_MEGABYTE * 0.9) expect(result).toHaveLength(3) + const expectedSplitRemoves = [ + { + timestamp: 12343, + type: 3, + data: { + // removes are processed first by the replayer, so we need to be sure we're emitting them first + removes: [{ parentId: 1, id: 2 }], + adds: [], + texts: [], + attributes: [], + source: 0, + }, + } as incrementalSnapshotEvent, + ] expect(result[0]).toEqual({ ...buffer, - size: 23, - data: [ - { - timestamp: 12343, - type: 3, - data: { - // removes are processed first by the replayer, so we need to be sure we're emitting them first - removes: [{ parentId: 1, id: 2 }], - adds: [], - texts: [], - attributes: [], - source: 0, - }, - } as incrementalSnapshotEvent, - ], + size: estimateSize(expectedSplitRemoves), + data: expectedSplitRemoves, }) + const expectedSplitAddsOne = [ + { + timestamp: 12344, + type: 3, + data: { + source: 0, + texts: [], + attributes: [], + removes: [], + adds: [ + { + parentId: 1, + nextId: null, + node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, + }, + ], + }, + }, + ] expect(result[1]).toEqual( // the two adds each only fit one at a time, so they are split in order // TODO if we sort these by timestamp at playback what's going to happen... // we need to maintain the original order { ...buffer, - size: 1048616, - data: [ - { - timestamp: 12344, - type: 3, - data: { - source: 0, - texts: [], - attributes: [], - removes: [], - adds: [ - { - parentId: 1, - nextId: null, - node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, - }, - ], - }, - }, - ], + size: estimateSize(expectedSplitAddsOne), + data: expectedSplitAddsOne, } ) + const expectedSplitAddsTwo = [ + { + timestamp: 12345, + type: 3, + data: { + source: 0, + texts: [], + attributes: [], + removes: [], + adds: [ + { + parentId: 2, + nextId: null, + node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, + }, + ], + }, + }, + ] expect(result[2]).toEqual({ ...buffer, - size: 1048616, - data: [ - { - timestamp: 12345, - type: 3, - data: { - source: 0, - texts: [], - attributes: [], - removes: [], - adds: [ - { - parentId: 2, - nextId: null, - node: ONE_MEGABYTE_OF_DATA as unknown as serializedNodeWithId, - }, - ], - }, - }, - ], + size: estimateSize(expectedSplitAddsTwo), + data: expectedSplitAddsTwo, }) }) }) diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index 728ef21df..d844245b7 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -191,129 +191,107 @@ function sliceBuffer(buffer: SnapshotBuffer, sizeLimit: number): SnapshotBuffer[ })) } -function hasIncrementalContent(buffer: SnapshotBuffer): boolean { - return buffer.data.some((item) => { - const mutationData = item.data as mutationData - return ( - !isNullish(mutationData) && - (mutationData?.adds?.length || - mutationData?.removes?.length || - mutationData?.texts?.length || - mutationData?.attributes?.length) - ) - }) +function hasIncrementalContent(e: eventWithTime): boolean { + const mutationData = e.data as mutationData + return ( + !isNullish(mutationData) && + (!!mutationData?.adds?.length || + !!mutationData?.removes?.length || + !!mutationData?.texts?.length || + !!mutationData?.attributes?.length) + ) } function countChildren(xs: any[][]): number { return xs.reduce((acc, x) => acc + x.length, 0) } -// uses a pretty high size limit to avoid splitting too much -export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_MEGABYTES): SnapshotBuffer[] { - if (buffer.size >= sizeLimit && buffer.data.length > 1) { - return sliceBuffer(buffer, sizeLimit) - } else if (buffer.size >= sizeLimit && buffer.data.length === 1) { - // we can maybe split up incremental snapshots, or directly edit data image urls here - const bufferedData = buffer.data[0] - if ( - bufferedData.type === INCREMENTAL_SNAPSHOT_EVENT_TYPE && - bufferedData.data.source === MUTATION_SOURCE_TYPE - ) { - // so at this point we know that the buffer is too large, and we have a single incremental snapshot - // it may be that a single item in the buffer is too large - // or that there are many small items of one or more types that end up being too large - // rrweb processes removes, then adds, then texts, the attributes, - // so we can split them in that order - const bufferedMutations = bufferedData.data as mutationData - const removes = sliceList(bufferedMutations.removes, sizeLimit) - const adds = sliceList(bufferedMutations.adds, sizeLimit) - const texts = sliceList(bufferedMutations.texts, sizeLimit) - const attributes = sliceList(bufferedMutations.attributes, sizeLimit) +function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): eventWithTime[] { + // NB: this isn't checking the size so will _always_ split incremental snapshots + if (bufferedData.type === INCREMENTAL_SNAPSHOT_EVENT_TYPE && bufferedData.data.source === MUTATION_SOURCE_TYPE) { + // so at this point we know that the buffer is too large, and we have a single incremental snapshot + // it may be that a single item in the buffer is too large + // or that there are many small items of one or more types that end up being too large + // rrweb processes removes, then adds, then texts, the attributes, + // so we can split them in that order + const bufferedMutations = bufferedData.data as mutationData + const removes = sliceList(bufferedMutations.removes || [], sizeLimit) + const adds = sliceList(bufferedMutations.adds || [], sizeLimit) + const texts = sliceList(bufferedMutations.texts || [], sizeLimit) + const attributes = sliceList(bufferedMutations.attributes || [], sizeLimit) - // the incoming data has a single timestamp, so we need to adjust the timestamps of the split data, - // so we count how many children we have in total, and then we adjust the timestamp - // so that if there are 10 the first item is 9 milliseconds before the original timestamp - // and the final item has the original timestamp - const alteration = - countChildren(removes) + countChildren(adds) + countChildren(texts) + countChildren(attributes) - let timestampWiggleMarker = 1 + // the incoming data has a single timestamp, so we need to adjust the timestamps of the split data, + // so we count how many children we have in total, and then we adjust the timestamp + // so that if there are 10 the first item is 9 milliseconds before the original timestamp + // and the final item has the original timestamp + const alteration = + countChildren(removes) + countChildren(adds) + countChildren(texts) + countChildren(attributes) + let timestampWiggleMarker = 1 - return [ - ...removes.map((remove) => ({ - size: estimateSize(remove), - data: [ - { - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - adds: [], - texts: [], - attributes: [], - removes: remove, - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - }, - ], - sessionId: buffer.sessionId, - windowId: buffer.windowId, - })), - ...adds.map((add) => ({ - size: estimateSize(add), - data: [ - { - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - adds: add, - texts: [], - attributes: [], - removes: [], - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - }, - ], - sessionId: buffer.sessionId, - windowId: buffer.windowId, - })), - ...texts.map((text) => ({ - size: estimateSize(text), - data: [ - { - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - adds: [], - texts: text, - attributes: [], - removes: [], - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - }, - ], - sessionId: buffer.sessionId, - windowId: buffer.windowId, - })), - ...attributes.map((attribute) => ({ - size: estimateSize(attribute), - data: [ - { - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - adds: [], - texts: [], - attributes: attribute, - removes: [], - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - }, - ], - sessionId: buffer.sessionId, - windowId: buffer.windowId, - })), - ].filter(hasIncrementalContent) + return [ + ...removes.map((remove) => ({ + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: [], + texts: [], + attributes: [], + removes: remove, + }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, + })), + ...adds.map((add) => ({ + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: add, + texts: [], + attributes: [], + removes: [], + }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, + })), + ...texts.map((text) => ({ + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: [], + texts: text, + attributes: [], + removes: [], + }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, + })), + ...attributes.map((attribute) => ({ + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + adds: [], + texts: [], + attributes: attribute, + removes: [], + }, + timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, + })), + ].filter(hasIncrementalContent) + } else { + return [bufferedData] + } +} + +// uses a pretty high size limit to avoid splitting too much +export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_MEGABYTES): SnapshotBuffer[] { + if (buffer.size >= sizeLimit) { + // it may be because one or more incremental snapshots is very large + const splitData = buffer.data.map((bd) => splitIncrementalData(bd, sizeLimit)).flat() + const splitBuffer: SnapshotBuffer = { + size: estimateSize(splitData), + data: splitData, + sessionId: buffer.sessionId, + windowId: buffer.windowId, } - return [buffer] + // or because the array of snapshots in the buffer is now too large + return sliceBuffer(splitBuffer, sizeLimit) } else { return [buffer] } From dc488afe454818032868ef0a09956807281c2342 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sat, 20 Jul 2024 11:15:23 +0100 Subject: [PATCH 4/7] order --- src/extensions/replay/sessionrecording-utils.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index d844245b7..dfcf45fee 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -233,10 +233,10 @@ function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): e type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, data: { source: MUTATION_SOURCE_TYPE, + removes: remove, adds: [], texts: [], attributes: [], - removes: remove, }, timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, })), @@ -244,10 +244,10 @@ function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): e type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, data: { source: MUTATION_SOURCE_TYPE, + removes: [], adds: add, texts: [], attributes: [], - removes: [], }, timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, })), @@ -255,10 +255,10 @@ function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): e type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, data: { source: MUTATION_SOURCE_TYPE, + removes: [], adds: [], texts: text, attributes: [], - removes: [], }, timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, })), @@ -266,10 +266,10 @@ function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): e type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, data: { source: MUTATION_SOURCE_TYPE, + removes: [], adds: [], texts: [], attributes: attribute, - removes: [], }, timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, })), From 3f41b34ad6a93ef8ffdf8692ef4a4965ee042662 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sat, 20 Jul 2024 11:22:01 +0100 Subject: [PATCH 5/7] refactor --- .../replay/sessionrecording-utils.ts | 78 ++++++++----------- 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index dfcf45fee..9fe9bedcb 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -5,6 +5,7 @@ import { KeepIframeSrcFn, listenerHandler, maskTextClass, + mutationCallbackParam, mutationData, pluginEvent, RecordPlugin, @@ -206,6 +207,21 @@ function countChildren(xs: any[][]): number { return xs.reduce((acc, x) => acc + x.length, 0) } +function incrementalSnapshotFrom(mutationData: Partial, timestamp: number) { + return { + type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, + data: { + source: MUTATION_SOURCE_TYPE, + removes: [], + adds: [], + texts: [], + attributes: [], + ...mutationData, + }, + timestamp: timestamp, + } +} + function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): eventWithTime[] { // NB: this isn't checking the size so will _always_ split incremental snapshots if (bufferedData.type === INCREMENTAL_SNAPSHOT_EVENT_TYPE && bufferedData.data.source === MUTATION_SOURCE_TYPE) { @@ -229,50 +245,24 @@ function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): e let timestampWiggleMarker = 1 return [ - ...removes.map((remove) => ({ - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - removes: remove, - adds: [], - texts: [], - attributes: [], - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - })), - ...adds.map((add) => ({ - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - removes: [], - adds: add, - texts: [], - attributes: [], - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - })), - ...texts.map((text) => ({ - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - removes: [], - adds: [], - texts: text, - attributes: [], - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - })), - ...attributes.map((attribute) => ({ - type: INCREMENTAL_SNAPSHOT_EVENT_TYPE, - data: { - source: MUTATION_SOURCE_TYPE, - removes: [], - adds: [], - texts: [], - attributes: attribute, - }, - timestamp: bufferedData.timestamp - alteration + timestampWiggleMarker++, - })), + ...removes.map((remove) => + incrementalSnapshotFrom( + { removes: remove }, + bufferedData.timestamp - alteration + timestampWiggleMarker++ + ) + ), + ...adds.map((add) => + incrementalSnapshotFrom({ adds: add }, bufferedData.timestamp - alteration + timestampWiggleMarker++) + ), + ...texts.map((text) => + incrementalSnapshotFrom({ texts: text }, bufferedData.timestamp - alteration + timestampWiggleMarker++) + ), + ...attributes.map((attribute) => + incrementalSnapshotFrom( + { attributes: attribute }, + bufferedData.timestamp - alteration + timestampWiggleMarker++ + ) + ), ].filter(hasIncrementalContent) } else { return [bufferedData] From 54878a2583103001d5b94e5dc4c5eeb5332893c9 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sat, 20 Jul 2024 11:30:20 +0100 Subject: [PATCH 6/7] one fewer estimate --- src/extensions/replay/sessionrecording-utils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index 9fe9bedcb..48c732240 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -275,7 +275,8 @@ export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_ME // it may be because one or more incremental snapshots is very large const splitData = buffer.data.map((bd) => splitIncrementalData(bd, sizeLimit)).flat() const splitBuffer: SnapshotBuffer = { - size: estimateSize(splitData), + // NB this is no longer totally accurate but will be replaced in sliceBuffer below + size: buffer.size, data: splitData, sessionId: buffer.sessionId, windowId: buffer.windowId, From eab44881a9de1f02765512acafc8445c501c0a2d Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sat, 20 Jul 2024 11:36:29 +0100 Subject: [PATCH 7/7] we already have a max message size, just use that --- .../extensions/replay/sessionrecording-utils.test.ts | 4 ++-- src/extensions/replay/sessionrecording-utils.ts | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts index 74ed66772..3d5dcc531 100644 --- a/src/__tests__/extensions/replay/sessionrecording-utils.test.ts +++ b/src/__tests__/extensions/replay/sessionrecording-utils.test.ts @@ -5,7 +5,6 @@ import { FULL_SNAPSHOT_EVENT_TYPE, PLUGIN_EVENT_TYPE, replacementImageURI, - SEVEN_MEGABYTES, splitBuffer, truncateLargeConsoleLogs, } from '../../../extensions/replay/sessionrecording-utils' @@ -15,6 +14,7 @@ import { serializedNodeWithId } from 'rrweb-snapshot' import { SnapshotBuffer } from '../../../extensions/replay/sessionrecording' const ONE_MEGABYTE = 1024 * 1024 +const SEVEN_MEGABYTES = ONE_MEGABYTE * 7 * 0.9 // ~7mb (with some wiggle room) const ONE_MEGABYTE_OF_DATA = 'a'.repeat(1024 * 1024) describe(`SessionRecording utility functions`, () => { @@ -293,7 +293,7 @@ describe(`SessionRecording utility functions`, () => { windowId: 'window1', } - const result = splitBuffer(buffer) + const result = splitBuffer(buffer, SEVEN_MEGABYTES) expect(result.length).toBe(15) let partTotal = 0 diff --git a/src/extensions/replay/sessionrecording-utils.ts b/src/extensions/replay/sessionrecording-utils.ts index 48c732240..ad5931c7b 100644 --- a/src/extensions/replay/sessionrecording-utils.ts +++ b/src/extensions/replay/sessionrecording-utils.ts @@ -165,8 +165,6 @@ export function truncateLargeConsoleLogs(_event: eventWithTime) { return _event } -export const SEVEN_MEGABYTES = 1024 * 1024 * 7 * 0.9 // ~7mb (with some wiggle room) - function sliceList(list: any[], sizeLimit: number): any[][] { const size = estimateSize(list) if (size < sizeLimit) { @@ -270,7 +268,7 @@ function splitIncrementalData(bufferedData: eventWithTime, sizeLimit: number): e } // uses a pretty high size limit to avoid splitting too much -export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = SEVEN_MEGABYTES): SnapshotBuffer[] { +export function splitBuffer(buffer: SnapshotBuffer, sizeLimit: number = MAX_MESSAGE_SIZE): SnapshotBuffer[] { if (buffer.size >= sizeLimit) { // it may be because one or more incremental snapshots is very large const splitData = buffer.data.map((bd) => splitIncrementalData(bd, sizeLimit)).flat()