Skip to content

Commit ccc16ff

Browse files
authored
Engine: Limit payload sizes of events (#890)
* engine: force a limit on payloads exiting the engine * engine: add redacted flag to redacted events * worker: remove payload validation logic * typing * fix eventing * typing * tweak error message * integration test * prefer Buffer .byteLength to new Blob() * changeset * remove clutter * engine: optionally return the run result If the result is large, it might trigger OOM while being returned to the main process * tweak error message * extra engine changeset * typo * engine: remove debug code * version: [email protected]
1 parent b4f0f12 commit ccc16ff

33 files changed

+347
-93
lines changed

integration-tests/worker/CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# @openfn/integration-tests-worker
22

3+
## 1.0.79
4+
5+
### Patch Changes
6+
7+
- Updated dependencies [deb7293]
8+
- Updated dependencies [d50c05d]
9+
- @openfn/engine-multi@1.6.0
10+
- @openfn/ws-worker@1.12.0
11+
- @openfn/lightning-mock@2.1.2
12+
313
## 1.0.78
414

515
### Patch Changes

integration-tests/worker/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@openfn/integration-tests-worker",
33
"private": true,
4-
"version": "1.0.78",
4+
"version": "1.0.79",
55
"description": "Lightning WOrker integration tests",
66
"author": "Open Function Group <[email protected]>",
77
"license": "ISC",

integration-tests/worker/test/integration.test.ts

+39
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,45 @@ test.serial('Redact logs which exceed the payload limit', (t) => {
974974
});
975975
});
976976

977+
test.serial("Don't return dataclips which exceed the payload limit", (t) => {
978+
return new Promise(async (done) => {
979+
if (!worker.destroyed) {
980+
await worker.destroy();
981+
}
982+
983+
({ worker } = await initWorker(lightningPort, {
984+
maxWorkers: 1,
985+
// use the dummy repo to remove autoinstall
986+
repoDir: path.resolve('./dummy-repo'),
987+
}));
988+
989+
const run = {
990+
id: crypto.randomUUID(),
991+
jobs: [
992+
{
993+
adaptor: '@openfn/[email protected]',
994+
body: `fn(() => ({ data: 'abdef' }))`,
995+
},
996+
],
997+
options: {
998+
payload_limit_mb: 0,
999+
},
1000+
};
1001+
1002+
lightning.on('step:complete', (evt) => {
1003+
t.is(evt.payload.output_dataclip_error, 'DATACLIP_TOO_LARGE');
1004+
t.falsy(evt.payload.output_dataclip_id);
1005+
t.falsy(evt.payload.output_dataclip);
1006+
});
1007+
1008+
lightning.enqueueRun(run);
1009+
1010+
lightning.once('run:complete', () => {
1011+
done();
1012+
});
1013+
});
1014+
});
1015+
9771016
test.serial(
9781017
"Don't send job logs to stdout when job_log_level is set to none",
9791018
(t) => {

packages/engine-multi/CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# engine-multi
22

3+
## 1.6.0
4+
5+
### Minor Changes
6+
7+
- d50c05d: Fix an issue where large payloads can cause the worker to OOM crash
8+
9+
### Patch Changes
10+
11+
- deb7293: Don't return the result of a task unless explicitly requested
12+
313
## 1.5.1
414

515
### Patch Changes

packages/engine-multi/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/engine-multi",
3-
"version": "1.5.1",
3+
"version": "1.6.0",
44
"description": "Multi-process runtime engine",
55
"main": "dist/index.js",
66
"type": "module",

packages/engine-multi/src/api/call-worker.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export default function initWorkers(
3737
const callWorker: CallWorker = (
3838
task,
3939
args = [],
40-
events = [],
40+
events = {},
4141
options = {}
4242
) => {
4343
return workers.exec(task, args, {

packages/engine-multi/src/api/execute.ts

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ const execute = async (context: ExecutionContext) => {
5555

5656
const workerOptions = {
5757
memoryLimitMb: options.memoryLimitMb,
58+
payloadLimitMb: options.payloadLimitMb,
5859
timeout: options.runTimeoutMs,
5960
};
6061

packages/engine-multi/src/api/lifecycle.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,15 @@ export const jobComplete = (
9393
context: ExecutionContext,
9494
event: internalEvents.JobCompleteEvent
9595
) => {
96-
const { threadId, state, duration, jobId, next, mem } = event;
96+
const { threadId, state, duration, jobId, next, mem, redacted } = event;
9797

9898
context.emit(externalEvents.JOB_COMPLETE, {
9999
threadId,
100100
state,
101101
duration,
102102
jobId,
103103
next,
104+
redacted,
104105
mem,
105106
time: timestamp(),
106107
});

packages/engine-multi/src/engine.ts

+6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ const DEFAULT_RUN_TIMEOUT = 1000 * 60 * 10; // ms
3131

3232
const DEFAULT_MEMORY_LIMIT_MB = 500;
3333

34+
const DEFAULT_PAYLOAD_LIMIT_MB = 10;
35+
3436
// For each workflow, create an API object with its own event emitter
3537
// this is a bit weird - what if the emitter went on state instead?
3638
const createWorkflowEvents = (
@@ -72,6 +74,7 @@ export type EngineOptions = {
7274
logger: Logger;
7375
maxWorkers?: number;
7476
memoryLimitMb?: number;
77+
payloadLimitMb?: number;
7578
noCompile?: boolean; // TODO deprecate in favour of compile
7679
repoDir: string;
7780
resolvers?: LazyResolvers;
@@ -100,6 +103,8 @@ const createEngine = async (
100103

101104
const defaultTimeout = options.runTimeoutMs || DEFAULT_RUN_TIMEOUT;
102105
const defaultMemoryLimit = options.memoryLimitMb || DEFAULT_MEMORY_LIMIT_MB;
106+
const defaultPayloadLimit =
107+
options.payloadLimitMb || DEFAULT_PAYLOAD_LIMIT_MB;
103108

104109
let resolvedWorkerPath;
105110
if (workerPath) {
@@ -173,6 +178,7 @@ const createEngine = async (
173178
resolvers: opts.resolvers,
174179
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
175180
memoryLimitMb: opts.memoryLimitMb ?? defaultMemoryLimit,
181+
payloadLimitMb: opts.payloadLimitMb ?? defaultPayloadLimit,
176182
jobLogLevel: opts.jobLogLevel,
177183
},
178184
});

packages/engine-multi/src/events.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export interface JobCompletePayload extends ExternalEvent {
7777
state: any; // the result state
7878
next: string[]; // downstream jobs
7979
time: bigint;
80+
redacted?: boolean;
8081
mem: {
8182
job: number;
8283
system: number;
@@ -92,7 +93,9 @@ export interface JobErrorPayload extends ExternalEvent {
9293
next: string[]; // downstream jobs
9394
}
9495

95-
export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent {}
96+
export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent {
97+
redacted?: boolean;
98+
}
9699

97100
export interface EdgeResolvedPayload extends ExternalEvent {
98101
edgeId: string; // interesting, we don't really have this yet. Is index more appropriate? key? yeah, it's target node basically

packages/engine-multi/src/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type ExecutionContextConstructor = {
4545
};
4646

4747
export type ExecuteOptions = {
48+
payloadLimitMb?: number;
4849
memoryLimitMb?: number;
4950
resolvers?: LazyResolvers;
5051
runTimeoutMs?: number;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
export const REDACTED_STATE = {
2+
data: '[REDACTED_STATE]',
3+
_$REDACTED$_: true,
4+
};
5+
6+
export const REDACTED_LOG = {
7+
message: ['[REDACTED: Message length exceeds payload limit]'],
8+
_$REDACTED$_: true,
9+
};
10+
11+
export const verify = (value: any, limit_mb: number = 10) => {
12+
if (value && !isNaN(limit_mb)) {
13+
let size_mb = 0;
14+
try {
15+
const str = typeof value === 'string' ? value : JSON.stringify(value);
16+
const size_bytes = Buffer.byteLength(str, 'utf8');
17+
size_mb = size_bytes / 1024 / 1024;
18+
} catch (e) {
19+
// do nothing
20+
}
21+
22+
if (size_mb > limit_mb) {
23+
const e = new Error();
24+
// @ts-ignore
25+
e.name = 'PAYLOAD_TOO_LARGE';
26+
e.message = `The payload exceeded the size limit of ${limit_mb}mb`;
27+
throw e;
28+
}
29+
}
30+
};
31+
32+
export default (payload: any, limit_mb: number = 10) => {
33+
const newPayload = { ...payload };
34+
35+
// The payload could be any of the runtime events
36+
// The bits we might want to redact are state and message
37+
try {
38+
verify(payload.state, limit_mb);
39+
} catch (e) {
40+
newPayload.state = REDACTED_STATE;
41+
newPayload.redacted = true;
42+
}
43+
try {
44+
verify(payload.log, limit_mb);
45+
} catch (e) {
46+
Object.assign(newPayload.log, REDACTED_LOG);
47+
newPayload.redacted = true;
48+
}
49+
return newPayload;
50+
};

packages/engine-multi/src/worker/child/create-thread.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import { ENGINE_RUN_TASK } from '../events';
55

66
const scriptPath = process.argv[2];
77

8-
type ThreadOptions = {
8+
export type ThreadOptions = {
99
memoryLimitMb?: number;
10+
payloadLimitMb?: number;
1011
};
1112

1213
const createThread = (
@@ -24,6 +25,7 @@ const createThread = (
2425
type: ENGINE_RUN_TASK,
2526
task,
2627
args,
28+
options,
2729
});
2830

2931
return worker;

packages/engine-multi/src/worker/child/runner.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
ENGINE_RESOLVE_TASK,
88
ENGINE_RUN_TASK,
99
} from '../events';
10-
import createThread from './create-thread';
10+
import createThread, { ThreadOptions } from './create-thread';
1111
import serializeError from '../../util/serialize-error';
1212

1313
process.on('message', async (evt: WorkerEvent) => {
@@ -17,7 +17,11 @@ process.on('message', async (evt: WorkerEvent) => {
1717
}
1818
});
1919

20-
const run = async (task: string, args: any[] = [], options = {}) => {
20+
const run = async (
21+
task: string,
22+
args: any[] = [],
23+
options: ThreadOptions = {}
24+
) => {
2125
const thread = createThread(task, args, options);
2226

2327
thread.on('error', (e) => {

packages/engine-multi/src/worker/events.ts

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export interface JobStartEvent extends InternalEvent {
4949
export interface JobCompleteEvent extends InternalEvent {
5050
jobId: string;
5151
state: any;
52+
redacted?: boolean;
5253
duration: number;
5354
next: string[];
5455
mem: {

packages/engine-multi/src/worker/pool.ts

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export type ExecOpts = {
3434
timeout?: number; // ms
3535

3636
memoryLimitMb?: number;
37+
payloadLimitMb?: number;
3738
};
3839

3940
export type ChildProcessPool = Array<ChildProcess | false>;
@@ -210,6 +211,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
210211
args,
211212
options: {
212213
memoryLimitMb: opts.memoryLimitMb,
214+
payloadLimitMb: opts.payloadLimitMb,
213215
},
214216
} as RunTaskEvent);
215217
} catch (e) {
@@ -220,6 +222,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
220222
worker.on('exit', onExit);
221223

222224
worker.on('message', (evt: any) => {
225+
// TODO I think here we may have to decode the payload
226+
223227
// forward the message out of the pool
224228
opts.on?.(evt);
225229

packages/engine-multi/src/worker/thread/helpers.ts

+18-3
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,28 @@ export const createLoggers = (
6363
return { logger, jobLogger, adaptorLogger };
6464
};
6565

66+
type Options = {
67+
/**
68+
* Should we return results directly?
69+
* Useful for tests but dangerous in production
70+
* as can cause OOM errors for large results
71+
* */
72+
directReturn?: boolean;
73+
74+
/**
75+
* Allow a custom publish function to be passed in
76+
*/
77+
publish?: typeof publish;
78+
};
79+
6680
// Execute wrapper function
6781
export const execute = async (
6882
workflowId: string,
6983
executeFn: () => Promise<any> | undefined,
70-
publishFn = publish
84+
options: Options = {}
7185
) => {
86+
const publishFn = options.publish ?? publish;
87+
7288
const handleError = (err: any) => {
7389
publishFn(workerEvents.ERROR, {
7490
// @ts-ignore
@@ -127,8 +143,7 @@ export const execute = async (
127143
const result = await executeFn();
128144
publishFn(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result });
129145

130-
// For tests
131-
return result;
146+
return options.directReturn ? result : {};
132147
} catch (err: any) {
133148
handleError(err);
134149
}

packages/engine-multi/src/worker/thread/mock-run.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,5 @@ function mockRun(plan: MockExecutionPlan, input: State, _options = {}) {
8787

8888
register({
8989
run: async (plan: MockExecutionPlan, input: State, _options?: any) =>
90-
execute(plan.id, () => mockRun(plan, input)),
90+
execute(plan.id, () => mockRun(plan, input), { directReturn: true }),
9191
});

packages/engine-multi/src/worker/thread/run.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ register({
4343
console = adaptorLogger;
4444

4545
// Leave console.debug for local debugging
46-
// This goes to stdout but not the adpator logger
46+
// This goes to stdout but not the adapator logger
4747
console.debug = debug;
4848

4949
// TODO I would like to pull these options out of here

0 commit comments

Comments
 (0)