Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add an explicit in-memory buffer #1217

Merged
merged 7 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/__tests__/extensions/replay/sessionrecording.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ describe('SessionRecording', () => {

sessionRecording.afterDecideResponse(makeDecideResponse({ sessionRecording: undefined }))
expect(sessionRecording['status']).toBe('disabled')
expect(sessionRecording['buffer']?.data.length).toEqual(undefined)
expect(sessionRecording['buffer'].data.length).toEqual(0)
expect(posthog.capture).not.toHaveBeenCalled()
})

Expand Down Expand Up @@ -624,7 +624,7 @@ describe('SessionRecording', () => {

// access private method 🤯so we don't need to wait for the timer
sessionRecording['_flushBuffer']()
expect(sessionRecording['buffer']?.data.length).toEqual(undefined)
expect(sessionRecording['buffer'].data.length).toEqual(0)

expect(posthog.capture).toHaveBeenCalledTimes(1)
expect(posthog.capture).toHaveBeenCalledWith(
Expand Down Expand Up @@ -700,7 +700,7 @@ describe('SessionRecording', () => {
// Another big event means the old data will be flushed
_emit(createIncrementalSnapshot({ data: { source: 1, payload: bigData } }))
expect(posthog.capture).toHaveBeenCalled()
expect(sessionRecording['buffer']?.data.length).toEqual(1) // The new event
expect(sessionRecording['buffer'].data.length).toEqual(1) // The new event
expect(sessionRecording['buffer']).toMatchObject({ size: 755017 })
})

Expand All @@ -712,7 +712,7 @@ describe('SessionRecording', () => {

_emit(createIncrementalSnapshot({ data: { source: 1, payload: bigData } }))
expect(sessionRecording['buffer']).toMatchObject({ size: 755037 }) // the size of the big data event
expect(sessionRecording['buffer']?.data.length).toEqual(2) // full snapshot and a big event
expect(sessionRecording['buffer'].data.length).toEqual(2) // full snapshot and a big event

_emit(createIncrementalSnapshot({ data: { source: 1, payload: 1 } }))
_emit(createIncrementalSnapshot({ data: { source: 1, payload: 2 } }))
Expand All @@ -725,21 +725,21 @@ describe('SessionRecording', () => {
// but the recording is still buffering
expect(sessionRecording['status']).toBe('buffering')
expect(posthog.capture).not.toHaveBeenCalled()
expect(sessionRecording['buffer']?.data.length).toEqual(5) // + the new event
expect(sessionRecording['buffer'].data.length).toEqual(5) // + the new event
expect(sessionRecording['buffer']).toMatchObject({ size: 755037 + 755101 }) // the size of the big data event
})

it('flushes buffer if the session_id changes', () => {
sessionRecording.afterDecideResponse(makeDecideResponse({ sessionRecording: { endpoint: '/s/' } }))
sessionRecording.startIfEnabledOrStop()

expect(sessionRecording['buffer']?.sessionId).toEqual(sessionId)
expect(sessionRecording['buffer'].sessionId).toEqual(sessionId)

_emit(createIncrementalSnapshot({ emit: 1 }))

expect(posthog.capture).not.toHaveBeenCalled()
expect(sessionRecording['buffer']?.sessionId).not.toEqual(null)
expect(sessionRecording['buffer']?.data).toEqual([
expect(sessionRecording['buffer'].sessionId).not.toEqual(null)
expect(sessionRecording['buffer'].data).toEqual([
createFullSnapshot(),
{ data: { source: 1 }, emit: 1, type: 3 },
])
Expand Down Expand Up @@ -1524,7 +1524,7 @@ describe('SessionRecording', () => {
expect(sessionRecording['sessionDuration']).toBe(100)
expect(sessionRecording['minimumDuration']).toBe(1500)

expect(sessionRecording['buffer']?.data.length).toBe(2) // full snapshot and the emitted incremental event
expect(sessionRecording['buffer'].data.length).toBe(2) // full snapshot and the emitted incremental event
// call the private method to avoid waiting for the timer
sessionRecording['_flushBuffer']()

Expand All @@ -1549,7 +1549,7 @@ describe('SessionRecording', () => {
expect(sessionRecording['sessionDuration']).toBe(-1000)
expect(sessionRecording['minimumDuration']).toBe(1500)

expect(sessionRecording['buffer']?.data.length).toBe(2) // full snapshot and the emitted incremental event
expect(sessionRecording['buffer'].data.length).toBe(2) // full snapshot and the emitted incremental event
// call the private method to avoid waiting for the timer
sessionRecording['_flushBuffer']()

Expand All @@ -1569,29 +1569,29 @@ describe('SessionRecording', () => {
expect(sessionRecording['sessionDuration']).toBe(100)
expect(sessionRecording['minimumDuration']).toBe(1500)

expect(sessionRecording['buffer']?.data.length).toBe(2) // full snapshot and the emitted incremental event
expect(sessionRecording['buffer'].data.length).toBe(2) // full snapshot and the emitted incremental event
// call the private method to avoid waiting for the timer
sessionRecording['_flushBuffer']()

expect(posthog.capture).not.toHaveBeenCalled()

_emit(createIncrementalSnapshot({ data: { source: 1 }, timestamp: sessionStartTimestamp + 1501 }))

expect(sessionRecording['buffer']?.data.length).toBe(3) // full snapshot and two emitted incremental events
expect(sessionRecording['buffer'].data.length).toBe(3) // full snapshot and two emitted incremental events
// call the private method to avoid waiting for the timer
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalled()
expect(sessionRecording['buffer']?.data.length).toBe(undefined)
expect(sessionRecording['buffer'].data.length).toBe(0)
expect(sessionRecording['sessionDuration']).toBe(null)
_emit(createIncrementalSnapshot({ data: { source: 1 }, timestamp: sessionStartTimestamp + 1502 }))
expect(sessionRecording['buffer']?.data.length).toBe(1)
expect(sessionRecording['buffer'].data.length).toBe(1)
expect(sessionRecording['sessionDuration']).toBe(1502)
// call the private method to avoid waiting for the timer
sessionRecording['_flushBuffer']()

expect(posthog.capture).toHaveBeenCalled()
expect(sessionRecording['buffer']?.data.length).toBe(undefined)
expect(sessionRecording['buffer'].data.length).toBe(0)
})
})

Expand Down
87 changes: 43 additions & 44 deletions src/extensions/replay/sessionrecording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,34 @@ type SessionRecordingStatus = 'disabled' | 'sampled' | 'active' | 'buffering'
interface SnapshotBuffer {
size: number
data: any[]
sessionId: string | null
windowId: string | null
sessionId: string
windowId: string

readonly mostRecentSnapshotTimestamp: number | null
add(properties: Properties): void
}

class InMemoryBuffer implements SnapshotBuffer {
size: number
data: any[]
sessionId: string
windowId: string

get mostRecentSnapshotTimestamp(): number | null {
return this.data.length ? this.data[this.data.length - 1].timestamp : null
}

constructor(sessionId: string, windowId: string) {
this.size = 0
this.data = []
this.sessionId = sessionId
this.windowId = windowId
}

add(properties: Properties) {
this.size += properties.$snapshot_bytes
this.data.push(properties.$snapshot_data)
}
}

interface QueuedRRWebEvent {
Expand All @@ -112,12 +138,11 @@ const newQueuedEvent = (rrwebMethod: () => void): QueuedRRWebEvent => ({
const LOGGER_PREFIX = '[SessionRecording]'

export class SessionRecording {
private instance: PostHog
private _endpoint: string
private flushBufferTimer?: any

// we have a buffer - that contains PostHog snapshot events ready to be sent to the server
private buffer?: SnapshotBuffer
private buffer: SnapshotBuffer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no benefit to this being allowed to be undefined

// and a queue - that contains rrweb events that we want to send to rrweb, but rrweb wasn't able to accept them yet
private queuedRRWebEvents: QueuedRRWebEvent[] = []

Expand Down Expand Up @@ -153,7 +178,6 @@ export class SessionRecording {

private get sessionManager() {
if (!this.instance.sessionManager) {
logger.error(LOGGER_PREFIX + ' started without valid sessionManager')
throw new Error(LOGGER_PREFIX + ' started without valid sessionManager. This is a bug.')
}

Expand All @@ -166,9 +190,9 @@ export class SessionRecording {
}

private get sessionDuration(): number | null {
const mostRecentSnapshot = this.buffer?.data[this.buffer?.data.length - 1]
const mostRecentSnapshotTimestamp = this.buffer.mostRecentSnapshotTimestamp
const { sessionStartTimestamp } = this.sessionManager.checkAndGetSessionAndWindowId(true)
return mostRecentSnapshot ? mostRecentSnapshot.timestamp - sessionStartTimestamp : null
return mostRecentSnapshotTimestamp ? mostRecentSnapshotTimestamp - sessionStartTimestamp : null
}

private get isRecordingEnabled() {
Expand Down Expand Up @@ -250,8 +274,7 @@ export class SessionRecording {
}
}

constructor(instance: PostHog) {
this.instance = instance
constructor(private readonly instance: PostHog) {
this._captureStarted = false
this._endpoint = BASE_ENDPOINT
this.stopRrweb = undefined
Expand Down Expand Up @@ -286,7 +309,7 @@ export class SessionRecording {
this.sessionId = sessionId
this.windowId = windowId

this.buffer = this.clearBuffer()
this.buffer = new InMemoryBuffer(this.sessionId, this.windowId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicitly assigning it so that it doesn't need to be undefinable


// on reload there might be an already sampled session that should be continued before decide response,
// so we call this here _and_ in the decide response
Expand Down Expand Up @@ -846,24 +869,11 @@ export class SessionRecording {
return url
}

private clearBuffer(): SnapshotBuffer {
this.buffer = undefined

return {
size: 0,
data: [],
sessionId: this.sessionId,
windowId: this.windowId,
}
private clearBuffer(): void {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but keeping this convenience method for clarity

this.buffer = new InMemoryBuffer(this.sessionId, this.windowId)
}

// the intention is a buffer that (currently) is used only after a decide response enables session recording
// it is called ever X seconds using the flushBufferTimer so that we don't have to wait for the buffer to fill up
// when it is called on a timer it assumes that it can definitely flush
// it is flushed when the session id changes or the size of the buffered data gets too great (1mb by default)
// first change: if the recording is in buffering mode,
// flush buffer simply resets the timer and returns the existing flush buffer
private _flushBuffer() {
private _flushBuffer(): void {
if (this.flushBufferTimer) {
clearTimeout(this.flushBufferTimer)
this.flushBufferTimer = undefined
Expand All @@ -881,42 +891,31 @@ export class SessionRecording {
this.flushBufferTimer = setTimeout(() => {
this._flushBuffer()
}, RECORDING_BUFFER_TIMEOUT)
return this.buffer || this.clearBuffer()

return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.buffer can never be falsy anymore (and never would have been at this point anyway)

}

if (this.buffer && this.buffer.data.length !== 0) {
if (this.buffer.data.length > 0) {
this._captureSnapshot({
$snapshot_bytes: this.buffer.size,
$snapshot_data: this.buffer.data,
$session_id: this.buffer.sessionId,
$window_id: this.buffer.windowId,
})

return this.clearBuffer()
} else {
return this.buffer || this.clearBuffer()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.buffer never falsy anymore so no need for this branch

}
this.clearBuffer()
}

private _captureSnapshotBuffered(properties: Properties) {
const additionalBytes = 2 + (this.buffer?.data.length || 0) // 2 bytes for the array brackets and 1 byte for each comma
if (
!this.buffer ||
this.buffer.size + properties.$snapshot_bytes + additionalBytes > RECORDING_MAX_EVENT_SIZE ||
(!!this.buffer.sessionId && this.buffer.sessionId !== this.sessionId)
this.buffer.sessionId !== this.sessionId
) {
this.buffer = this._flushBuffer()
}

if (isNull(this.buffer.sessionId) && !isNull(this.sessionId)) {
// session id starts null but has now been assigned, update the buffer
this.buffer.sessionId = this.sessionId
this.buffer.windowId = this.windowId
Comment on lines -911 to -914
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer always has a session id now

this._flushBuffer()
}

this.buffer.size += properties.$snapshot_bytes
this.buffer.data.push(properties.$snapshot_data)

this.buffer.add(properties)
if (!this.flushBufferTimer) {
this.flushBufferTimer = setTimeout(() => {
this._flushBuffer()
Expand Down
Loading