Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Reduce large relay message payloads to protect Redis #12342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/@n8n/config/src/configs/logging.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const LOG_SCOPES = [
'multi-main-setup',
'pruning',
'pubsub',
'push',
'redis',
'scaling',
'waiting-executions',
Expand Down Expand Up @@ -70,10 +71,13 @@ export class LoggingConfig {
* - `external-secrets`
* - `license`
* - `multi-main-setup`
* - `pruning`
* - `pubsub`
* - `push`
* - `redis`
* - `scaling`
* - `waiting-executions`
* - `task-runner`
*
* @example
* `N8N_LOG_SCOPES=license`
Expand Down
19 changes: 18 additions & 1 deletion packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { readFileSync } from 'fs';
import type { n8n } from 'n8n-core';
import { jsonParse } from 'n8n-workflow';
import type { ITaskDataConnections } from 'n8n-workflow';
import { jsonParse, TRIMMED_TASK_DATA_CONNECTIONS_KEY } from 'n8n-workflow';
import { resolve, join, dirname } from 'path';

const { NODE_ENV, E2E_TESTS } = process.env;
Expand Down Expand Up @@ -159,6 +160,22 @@ export const ARTIFICIAL_TASK_DATA = {
],
};

/**
* Connections for an item standing in for a manual execution data item too
* large to be sent live via pubsub. This signals to the client to direct the
* user to the execution history.
*/
export const TRIMMED_TASK_DATA_CONNECTIONS: ITaskDataConnections = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move this to either core or workflow package so we could share the logic with FE instead of relying on magic strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see 17ff5a1

main: [
[
{
json: { [TRIMMED_TASK_DATA_CONNECTIONS_KEY]: true },
pairedItem: undefined,
},
],
],
};

/** Lowest priority, meaning shut down happens after other groups */
export const LOWEST_SHUTDOWN_PRIORITY = 0;
export const DEFAULT_SHUTDOWN_PRIORITY = 100;
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/push/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe('Push', () => {

test('should validate pushRef on requests for websocket backend', () => {
config.set('push.backend', 'websocket');
const push = new Push(mock(), mock());
const push = new Push(mock(), mock(), mock());
const ws = mock<WebSocket>();
const request = mock<WebSocketPushRequest>({ user, ws });
request.query = { pushRef: '' };
Expand All @@ -33,7 +33,7 @@ describe('Push', () => {

test('should validate pushRef on requests for SSE backend', () => {
config.set('push.backend', 'sse');
const push = new Push(mock(), mock());
const push = new Push(mock(), mock(), mock());
const request = mock<SSEPushRequest>({ user, ws: undefined });
request.query = { pushRef: '' };
expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError);
Expand Down
92 changes: 73 additions & 19 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ import type { Application } from 'express';
import { ServerResponse } from 'http';
import type { Server } from 'http';
import { InstanceSettings } from 'n8n-core';
import { deepCopy } from 'n8n-workflow';
import type { Socket } from 'net';
import { Container, Service } from 'typedi';
import { parse as parseUrl } from 'url';
import { Server as WSServer } from 'ws';

import { AuthService } from '@/auth/auth.service';
import config from '@/config';
import { TRIMMED_TASK_DATA_CONNECTIONS } from '@/constants';
import type { User } from '@/databases/entities/user';
import { OnShutdown } from '@/decorators/on-shutdown';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { Logger } from '@/logging/logger.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { TypedEmitter } from '@/typed-emitter';

Expand All @@ -27,6 +30,12 @@ type PushEvents = {

const useWebSockets = config.getEnv('push.backend') === 'websocket';

/**
* Max allowed size of a push message in bytes. Events going through the pubsub
* channel are trimmed if exceeding this size.
*/
const MAX_PAYLOAD_SIZE_BYTES = 5 * 1024 * 1024; // 5 MiB

/**
* Push service for uni- or bi-directional communication with frontend clients.
* Uses either server-sent events (SSE, unidirectional from backend --> frontend)
Expand All @@ -43,8 +52,10 @@ export class Push extends TypedEmitter<PushEvents> {
constructor(
private readonly instanceSettings: InstanceSettings,
private readonly publisher: Publisher,
private readonly logger: Logger,
) {
super();
this.logger = this.logger.scoped('push');

if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
}
Expand Down Expand Up @@ -91,25 +102,8 @@ export class Push extends TypedEmitter<PushEvents> {
}

send(pushMsg: PushMessage, pushRef: string) {
const { isWorker, isMultiMain } = this.instanceSettings;

/**
* In scaling mode, in single- or multi-main setup, in a manual execution,
* a worker relays execution lifecycle events to all mains. Only the main
* who holds the session for the execution will push to the frontend who
* commissioned the execution.
*
* In scaling mode, in multi-main setup, in a manual webhook execution, if
* the main who handles a webhook is not the main who created the webhook,
* the handler main relays execution lifecycle events to all mains. Only
* the main who holds the session for the execution will push events to
* the frontend who commissioned the execution.
*/
if (isWorker || (isMultiMain && !this.hasPushRef(pushRef))) {
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { ...pushMsg, pushRef },
});
if (this.shouldRelayViaPubSub(pushRef)) {
this.relayViaPubSub(pushMsg, pushRef);
Comment on lines +105 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this logic shouldn't be in the Push class. Now it ties the logic to implementation details of the scaling mode, where as Push should only be concerned about communicating via the websocket (or SSE). Now since this has already been here before we don't have to move it elsewhere in this PR. I just wanted to mention this for future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we need to work on the lifecycle hooks first to unlock this refactor.

return;
}

Expand All @@ -124,6 +118,66 @@ export class Push extends TypedEmitter<PushEvents> {
onShutdown() {
this.backend.closeAllConnections();
}

/**
* Whether to relay a push message via pubsub channel to other instances,
* instead of pushing the message directly to the frontend.
*
* This is needed in two scenarios:
*
* In scaling mode, in single- or multi-main setup, in a manual execution, a
* worker has no connection to a frontend and so relays to all mains lifecycle
* events for manual executions. Only the main who holds the session for the
* execution will push to the frontend who commissioned the execution.
*
* In scaling mode, in multi-main setup, in a manual webhook execution, if
* the main who handles a webhook is not the main who created the webhook,
* the handler main relays execution lifecycle events to all mains. Only
* the main who holds the session for the execution will push events to
* the frontend who commissioned the execution.
*/
private shouldRelayViaPubSub(pushRef: string) {
const { isWorker, isMultiMain } = this.instanceSettings;

return isWorker || (isMultiMain && !this.hasPushRef(pushRef));
}

/**
* Relay a push message via the `n8n.commands` pubsub channel,
* reducing the payload size if too large.
*
* See {@link shouldRelayViaPubSub} for more details.
*/
private relayViaPubSub(pushMsg: PushMessage, pushRef: string) {
const eventSizeBytes = new TextEncoder().encode(JSON.stringify(pushMsg.data)).length;

if (eventSizeBytes <= MAX_PAYLOAD_SIZE_BYTES) {
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { ...pushMsg, pushRef },
});
return;
}

// too large for pubsub channel, trim it

const pushMsgCopy = deepCopy(pushMsg);

const toMb = (bytes: number) => (bytes / (1024 * 1024)).toFixed(0);
const eventMb = toMb(eventSizeBytes);
const maxMb = toMb(MAX_PAYLOAD_SIZE_BYTES);
const { type } = pushMsgCopy;

this.logger.warn(`Size of "${type}" (${eventMb} MB) exceeds max size ${maxMb} MB. Trimming...`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to also log the node that caused this. For that we would have to do the trimming already earlier in the chain to have that available

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, same as here


if (type === 'nodeExecuteAfter') pushMsgCopy.data.data.data = TRIMMED_TASK_DATA_CONNECTIONS;
else if (type === 'executionFinished') pushMsgCopy.data.rawData = ''; // prompt client to fetch from DB

void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { ...pushMsgCopy, pushRef },
});
}
}

export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/wor
import * as testDb from '@test-integration/test-db';

describe('CollaborationService', () => {
mockInstance(Push, new Push(mock(), mock()));
mockInstance(Push, new Push(mock(), mock(), mock()));
let pushService: Push;
let collaborationService: CollaborationService;
let owner: User;
Expand Down
1 change: 1 addition & 0 deletions packages/design-system/src/components/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export { default as N8nOption } from './N8nOption';
export { default as N8nPopover } from './N8nPopover';
export { default as N8nPulse } from './N8nPulse';
export { default as N8nRadioButtons } from './N8nRadioButtons';
export { default as N8nRoute } from './N8nRoute';
export { default as N8nRecycleScroller } from './N8nRecycleScroller';
export { default as N8nResizeWrapper } from './N8nResizeWrapper';
export { default as N8nSelect } from './N8nSelect';
Expand Down
50 changes: 37 additions & 13 deletions packages/editor-ui/src/components/RunData.vue
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
<script setup lang="ts">
import { useStorage } from '@/composables/useStorage';
import { saveAs } from 'file-saver';
import type {
IBinaryData,
IConnectedNode,
IDataObject,
INodeExecutionData,
INodeOutputConfiguration,
IRunData,
IRunExecutionData,
ITaskMetadata,
NodeError,
NodeHint,
Workflow,
import {
type IBinaryData,
type IConnectedNode,
type IDataObject,
type INodeExecutionData,
type INodeOutputConfiguration,
type IRunData,
type IRunExecutionData,
type ITaskMetadata,
type NodeError,
type NodeHint,
type Workflow,
TRIMMED_TASK_DATA_CONNECTIONS_KEY,
} from 'n8n-workflow';
import { NodeConnectionType, NodeHelpers } from 'n8n-workflow';
import { computed, defineAsyncComponent, onBeforeUnmount, onMounted, ref, toRef, watch } from 'vue';
Expand Down Expand Up @@ -64,6 +65,7 @@ import { isEqual, isObject } from 'lodash-es';
import {
N8nBlockUi,
N8nButton,
N8nRoute,
N8nCallout,
N8nIconButton,
N8nInfoTip,
Expand Down Expand Up @@ -275,6 +277,10 @@ const isArtificialRecoveredEventItem = computed(
() => rawInputData.value?.[0]?.json?.isArtificialRecoveredEventItem,
);
const isTrimmedManualExecutionDataItem = computed(
() => rawInputData.value?.[0]?.json?.[TRIMMED_TASK_DATA_CONNECTIONS_KEY],
);
const subworkflowExecutionError = computed(() => {
if (!node.value) return null;
return {
Expand Down Expand Up @@ -1245,6 +1251,10 @@ function onSearchClear() {
document.dispatchEvent(new KeyboardEvent('keyup', { key: '/' }));
}
function onExecutionHistoryNavigate() {
ndvStore.setActiveNodeName(null);
}
function getExecutionLinkLabel(task: ITaskMetadata): string | undefined {
if (task.parentExecution) {
return i18n.baseText('runData.openParentExecution', {
Expand Down Expand Up @@ -1310,7 +1320,7 @@ defineExpose({ enterEditMode });
<slot name="header"></slot>

<div
v-show="!hasRunError"
v-show="!hasRunError && !isTrimmedManualExecutionDataItem"
:class="$style.displayModes"
data-test-id="run-data-pane-header"
@click.stop
Expand Down Expand Up @@ -1591,6 +1601,20 @@ defineExpose({ enterEditMode });
</N8nText>
</div>

<div v-else-if="isTrimmedManualExecutionDataItem" :class="$style.center">
<N8nText bold color="text-dark" size="large">
{{ i18n.baseText('runData.trimmedData.title') }}
</N8nText>
<N8nText>
{{ i18n.baseText('runData.trimmedData.message') }}
</N8nText>
<N8nButton size="small" @click="onExecutionHistoryNavigate">
<N8nRoute :to="`/workflow/${workflowsStore.workflowId}/executions`">
{{ i18n.baseText('runData.trimmedData.button') }}
</N8nRoute>
</N8nButton>
</div>

<div v-else-if="hasNodeRun && isArtificialRecoveredEventItem" :class="$style.center">
<slot name="recovered-artificial-output-data"></slot>
</div>
Expand Down
3 changes: 3 additions & 0 deletions packages/editor-ui/src/plugins/i18n/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,9 @@
"runData.aiContentBlock.tokens": "{count} Tokens",
"runData.aiContentBlock.tokens.prompt": "Prompt:",
"runData.aiContentBlock.tokens.completion": "Completion:",
"runData.trimmedData.title": "Data too large to display",
"runData.trimmedData.message": "The data is too large to be shown here. View the full details in 'Executions' tab.",
"runData.trimmedData.button": "See execution",
"saveButton.save": "@:_reusableBaseText.save",
"saveButton.saved": "Saved",
"saveWorkflowButton.hint": "Save workflow",
Expand Down
7 changes: 7 additions & 0 deletions packages/workflow/src/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,10 @@ export const LANGCHAIN_CUSTOM_TOOLS = [
export const SEND_AND_WAIT_OPERATION = 'sendAndWait';
export const AI_TRANSFORM_CODE_GENERATED_FOR_PROMPT = 'codeGeneratedForPrompt';
export const AI_TRANSFORM_JS_CODE = 'jsCode';

/**
* Key for an item standing in for a manual execution data item too large to be
* sent live via pubsub. See {@link TRIMMED_TASK_DATA_CONNECTIONS} in constants
* in `cli` package.
*/
export const TRIMMED_TASK_DATA_CONNECTIONS_KEY = '__isTrimmedManualExecutionDataItem';
Loading