From 919d895956fb73fcbc684a7485c3ff8a8ecd678b Mon Sep 17 00:00:00 2001 From: Rico Huijbers Date: Wed, 27 Oct 2021 10:46:16 +0200 Subject: [PATCH] feat(rosetta): improve translation throughput (#3083) Previously, Rosetta would divide all the examples to translate into `N` equally sized arrays, and spawn `N` workers to translate them all. Experimentation shows that the time required to translate samples is very unequally divided, and many workers used to be idle for half of the time after having finished their `1/Nth` of the samples, hurting throughput. Switch to a model where we have `N` workers, and we constantly feed them a small amount of work until all the work is done. This keeps all workers busy until the work is complete, improving the throughput a lot. On my machine, improves a run of Rosetta on the CDK repository with 8 workers from ~30m to ~15m. --- By submitting this pull request, I confirm that my contribution is made under the terms of the [Apache 2.0 license]. [Apache 2.0 license]: https://www.apache.org/licenses/LICENSE-2.0 --- packages/jsii-rosetta/README.md | 9 +- packages/jsii-rosetta/lib/commands/extract.ts | 109 ++++++++---------- .../lib/commands/extract_worker.ts | 35 ++---- packages/jsii-rosetta/package.json | 2 + yarn.lock | 21 ++-- 5 files changed, 82 insertions(+), 94 deletions(-) diff --git a/packages/jsii-rosetta/README.md b/packages/jsii-rosetta/README.md index 002776372e..19949fca4e 100644 --- a/packages/jsii-rosetta/README.md +++ b/packages/jsii-rosetta/README.md @@ -189,6 +189,13 @@ terminate the option list by passing `--`). Since TypeScript compilation takes a lot of time, much time can be gained by using the CPUs in your system effectively. `jsii-rosetta extract` will run the compilations in parallel if support for NodeJS Worker Threads is detected. -If worker thread support is available, `jsii-rosetta` will use a number of workers equal to half the number of CPU cores, +`jsii-rosetta` will use a number of workers equal to half the number of CPU cores, up to a maximum of 16 workers. This default maximum can be overridden by setting the `JSII_ROSETTA_MAX_WORKER_COUNT` environment variable. + +If you get out of memory errors running too many workers, run a command like this to up the memory allowed for your workers: + +``` +$ /sbin/sysctl -w vm.max_map_count=2251954 +``` + diff --git a/packages/jsii-rosetta/lib/commands/extract.ts b/packages/jsii-rosetta/lib/commands/extract.ts index 6a174ed391..b478ab5e28 100644 --- a/packages/jsii-rosetta/lib/commands/extract.ts +++ b/packages/jsii-rosetta/lib/commands/extract.ts @@ -1,7 +1,7 @@ import * as os from 'os'; import * as path from 'path'; import * as ts from 'typescript'; -import * as v8 from 'v8'; +import * as workerpool from 'workerpool'; import { loadAssemblies, allTypeScriptSnippets } from '../jsii/assemblies'; import * as logging from '../logging'; @@ -9,7 +9,7 @@ import { TypeScriptSnippet } from '../snippet'; import { snippetKey } from '../tablets/key'; import { LanguageTablet, TranslatedSnippet } from '../tablets/tablets'; import { RosettaDiagnostic, Translator, rosettaDiagFromTypescript } from '../translate'; -import { divideEvenly } from '../util'; +import type { TranslateBatchRequest, TranslateBatchResponse } from './extract_worker'; export interface ExtractResult { diagnostics: RosettaDiagnostic[]; @@ -81,24 +81,13 @@ function* filterSnippets(ts: IterableIterator, includeIds: st /** * Translate all snippets * - * Uses a worker-based parallel translation if available, falling back to a single-threaded workflow if not. + * We are now always using workers, as we are targeting Node 12+. */ async function translateAll( snippets: IterableIterator, includeCompilerDiagnostics: boolean, ): Promise { - try { - const worker = await import('worker_threads'); - - return await workerBasedTranslateAll(worker, snippets, includeCompilerDiagnostics); - } catch (e) { - if (e.code !== 'MODULE_NOT_FOUND') { - throw e; - } - logging.warn('Worker threads not available (use NodeJS >= 10.5 and --experimental-worker). Working sequentially.'); - - return singleThreadedTranslateAll(snippets, includeCompilerDiagnostics); - } + return workerBasedTranslateAll(snippets, includeCompilerDiagnostics); } /** @@ -140,65 +129,65 @@ export function singleThreadedTranslateAll( /** * Divide the work evenly over all processors by running 'extract_worker' in Worker Threads, then combine results * + * The workers are fed small queues of work each. We used to divide the entire queue into N + * but since the work is divided unevenly that led to some workers stopping early, idling while + * waiting for more work. + * * Never include 'extract_worker' directly, only do TypeScript type references (so that in * the script we may assume that 'worker_threads' successfully imports). */ async function workerBasedTranslateAll( - worker: typeof import('worker_threads'), snippets: IterableIterator, includeCompilerDiagnostics: boolean, ): Promise { - // Use about half the advertised cores because hyperthreading doesn't seem to help that - // much (on my machine, using more than half the cores actually makes it slower). + // Use about half the advertised cores because hyperthreading doesn't seem to + // help that much, or we become I/O-bound at some point. On my machine, using + // more than half the cores actually makes it slower. // Cap to a reasonable top-level limit to prevent thrash on machines with many, many cores. const maxWorkers = parseInt(process.env.JSII_ROSETTA_MAX_WORKER_COUNT ?? '16'); const N = Math.min(maxWorkers, Math.max(1, Math.ceil(os.cpus().length / 2))); const snippetArr = Array.from(snippets); - const groups = divideEvenly(N, snippetArr); - logging.info(`Translating ${snippetArr.length} snippets using ${groups.length} workers`); + logging.info(`Translating ${snippetArr.length} snippets using ${N} workers`); - // Run workers - const responses = await Promise.all( - groups.map((snippets) => ({ snippets, includeCompilerDiagnostics })).map(runWorker), - ); + const pool = workerpool.pool(path.join(__dirname, 'extract_worker.js'), { + maxWorkers: N, + }); - // Combine results - const x = responses.reduce( - (acc, current) => { - // Modifying 'acc' in place to not incur useless copying - acc.translatedSnippetSchemas.push(...current.translatedSnippetSchemas); - acc.diagnostics.push(...current.diagnostics); - return acc; - }, - { translatedSnippetSchemas: [], diagnostics: [] }, - ); - // Hydrate TranslatedSnippets from data back to objects - return { - diagnostics: x.diagnostics, - translatedSnippets: x.translatedSnippetSchemas.map((s) => TranslatedSnippet.fromSchema(s)), - }; + try { + const requests = batchSnippets(snippetArr, includeCompilerDiagnostics); - /** - * Turn running the worker into a nice Promise. - */ - async function runWorker( - request: import('./extract_worker').TranslateRequest, - ): Promise { - return new Promise((resolve, reject) => { - const wrk = new worker.Worker(path.join(__dirname, 'extract_worker.js'), { - resourceLimits: { - // Note: V8 heap statistics are expressed in bytes, so we divide by 1MiB (1,048,576 bytes) - maxOldGenerationSizeMb: Math.ceil(v8.getHeapStatistics().heap_size_limit / 1_048_576), - }, - workerData: request, - }); - wrk.on('message', resolve); - wrk.on('error', reject); - wrk.on('exit', (code) => { - if (code !== 0) { - reject(new Error(`Worker exited with code ${code}`)); - } - }); + const responses: TranslateBatchResponse[] = await Promise.all( + requests.map((request) => pool.exec('translateBatch', [request])), + ); + + const diagnostics = new Array(); + const translatedSnippets = new Array(); + + // Combine results + for (const response of responses) { + diagnostics.push(...response.diagnostics); + translatedSnippets.push(...response.translatedSchemas.map(TranslatedSnippet.fromSchema)); + } + return { diagnostics, translatedSnippets }; + } finally { + // Not waiting on purpose + void pool.terminate(); + } +} + +function batchSnippets( + snippets: TypeScriptSnippet[], + includeCompilerDiagnostics: boolean, + batchSize = 10, +): TranslateBatchRequest[] { + const ret = []; + + for (let i = 0; i < snippets.length; i += batchSize) { + ret.push({ + snippets: snippets.slice(i, i + batchSize), + includeCompilerDiagnostics, }); } + + return ret; } diff --git a/packages/jsii-rosetta/lib/commands/extract_worker.ts b/packages/jsii-rosetta/lib/commands/extract_worker.ts index a677fa3426..8316c1548c 100644 --- a/packages/jsii-rosetta/lib/commands/extract_worker.ts +++ b/packages/jsii-rosetta/lib/commands/extract_worker.ts @@ -1,46 +1,31 @@ /** * Pool worker for extract.ts */ -import * as worker from 'worker_threads'; +import * as workerpool from 'workerpool'; -import * as logging from '../logging'; import { TypeScriptSnippet } from '../snippet'; import { TranslatedSnippetSchema } from '../tablets/schema'; import { RosettaDiagnostic } from '../translate'; import { singleThreadedTranslateAll } from './extract'; -export interface TranslateRequest { - includeCompilerDiagnostics: boolean; - snippets: TypeScriptSnippet[]; +export interface TranslateBatchRequest { + readonly snippets: TypeScriptSnippet[]; + readonly includeCompilerDiagnostics: boolean; } -export interface TranslateResponse { - diagnostics: RosettaDiagnostic[]; +export interface TranslateBatchResponse { // Cannot be 'TranslatedSnippet' because needs to be serializable - translatedSnippetSchemas: TranslatedSnippetSchema[]; + readonly translatedSchemas: TranslatedSnippetSchema[]; + readonly diagnostics: RosettaDiagnostic[]; } -function translateSnippet(request: TranslateRequest): TranslateResponse { +function translateBatch(request: TranslateBatchRequest): TranslateBatchResponse { const result = singleThreadedTranslateAll(request.snippets[Symbol.iterator](), request.includeCompilerDiagnostics); return { + translatedSchemas: result.translatedSnippets.map((s) => s.toSchema()), diagnostics: result.diagnostics, - translatedSnippetSchemas: result.translatedSnippets.map((s) => s.toSchema()), }; } -if (worker.isMainThread) { - // Throw an error to prevent accidental require() of this module. In principle not a big - // deal, but we want to be compatible with run modes where 'worker_threads' is not available - // and by doing this people on platforms where 'worker_threads' is available don't accidentally - // add a require(). - throw new Error('This script should be run as a worker, not included directly.'); -} - -const request: TranslateRequest = worker.workerData; -const startTime = Date.now(); -const response = translateSnippet(request); -const delta = (Date.now() - startTime) / 1000; -// eslint-disable-next-line prettier/prettier -logging.info(`Finished translation of ${request.snippets.length} in ${delta.toFixed(0)}s (${response.translatedSnippetSchemas.length} responses)`); -worker.parentPort!.postMessage(response); +workerpool.worker({ translateBatch }); diff --git a/packages/jsii-rosetta/package.json b/packages/jsii-rosetta/package.json index 3d30ccbde2..920fbbc943 100644 --- a/packages/jsii-rosetta/package.json +++ b/packages/jsii-rosetta/package.json @@ -21,6 +21,7 @@ "@types/jest": "^27.0.2", "@types/mock-fs": "^4.13.1", "@types/node": "^12.20.28", + "@types/workerpool": "^6.1.0", "eslint": "^7.32.0", "jest": "^27.2.4", "jsii": "^0.0.0", @@ -37,6 +38,7 @@ "typescript": "~3.9.10", "sort-json": "^2.0.0", "@xmldom/xmldom": "^0.7.5", + "workerpool": "^6.1.5", "yargs": "^16.2.0" }, "license": "Apache-2.0", diff --git a/yarn.lock b/yarn.lock index 3399384f29..4525fd0fe6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1585,14 +1585,7 @@ dependencies: "@types/istanbul-lib-report" "*" -"@types/jest-expect-message@^1.0.3": - version "1.0.3" - resolved "https://registry.yarnpkg.com/@types/jest-expect-message/-/jest-expect-message-1.0.3.tgz#433ad89565c871acefafedeb957d0a8ec2a977a0" - integrity sha512-sp70Lc8POkOcXHEcLERpX/7B/BtQiqIYz3AvC9ZMNKSaiDttr8hKvz9DljIn7N6WJi3ioVoTtB1utDAX46oPlg== - dependencies: - "@types/jest" "*" - -"@types/jest@*", "@types/jest@^27.0.2": +"@types/jest@^27.0.2": version "27.0.2" resolved "https://registry.yarnpkg.com/@types/jest/-/jest-27.0.2.tgz#ac383c4d4aaddd29bbf2b916d8d105c304a5fcd7" integrity sha512-4dRxkS/AFX0c5XW6IPMNOydLn2tEhNhJV7DnYK+0bjoJZ+QTmfucBlihX7aoEsh/ocYtkLC73UbnBXBXIxsULA== @@ -1699,6 +1692,13 @@ dependencies: "@types/node" "*" +"@types/workerpool@^6.1.0": + version "6.1.0" + resolved "https://registry.yarnpkg.com/@types/workerpool/-/workerpool-6.1.0.tgz#16c3b9d3c62a8f6e6ad2e4d6212a68130f0cd3b1" + integrity sha512-C+J/c1BHyc351xJuiH2Jbe+V9hjf5mCzRP0UK4KEpF5SpuU+vJ/FC5GLZsCU/PJpp/3I6Uwtfm3DG7Lmrb7LOQ== + dependencies: + "@types/node" "*" + "@types/yargs-parser@*": version "20.2.1" resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-20.2.1.tgz#3b9ce2489919d9e4fea439b76916abc34b2df129" @@ -7969,6 +7969,11 @@ wordwrap@^1.0.0: resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb" integrity sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus= +workerpool@^6.1.5: + version "6.1.5" + resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-6.1.5.tgz#0f7cf076b6215fd7e1da903ff6f22ddd1886b581" + integrity sha512-XdKkCK0Zqc6w3iTxLckiuJ81tiD/o5rBE/m+nXpRCB+/Sq4DqkfXZ/x0jW02DG1tGsfUGXbTJyZDP+eu67haSw== + wrap-ansi@^6.2.0: version "6.2.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-6.2.0.tgz#e9393ba07102e6c91a3b221478f0257cd2856e53"