-
Notifications
You must be signed in to change notification settings - Fork 12k
/
Copy pathaction-executor.ts
153 lines (131 loc) · 4.77 KB
/
action-executor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import JestWorker from 'jest-worker';
import * as os from 'os';
import * as path from 'path';
import * as v8 from 'v8';
import { BundleActionCache } from './action-cache';
import { I18nOptions } from './i18n-options';
import { InlineOptions, ProcessBundleOptions, ProcessBundleResult } from './process-bundle';
const hasThreadSupport = (() => {
try {
require('worker_threads');
return true;
} catch {
return false;
}
})();
// This is used to normalize serialization messaging across threads and processes
// Threads use the structured clone algorithm which handles more types
// Processes use JSON which is much more limited
const serialize = ((v8 as unknown) as { serialize(value: unknown): Buffer }).serialize;
let workerFile = require.resolve('./process-bundle');
workerFile =
path.extname(workerFile) === '.ts'
? require.resolve('./process-bundle-bootstrap')
: workerFile;
export class BundleActionExecutor {
private largeWorker?: JestWorker;
private smallWorker?: JestWorker;
private cache?: BundleActionCache;
constructor(
private workerOptions: { cachePath?: string; i18n: I18nOptions },
integrityAlgorithm?: string,
private readonly sizeThreshold = 32 * 1024,
) {
if (workerOptions.cachePath) {
this.cache = new BundleActionCache(workerOptions.cachePath, integrityAlgorithm);
}
}
private static executeMethod<O>(worker: JestWorker, method: string, input: unknown): Promise<O> {
return ((worker as unknown) as Record<string, (i: unknown) => Promise<O>>)[method](input);
}
private ensureLarge(): JestWorker {
if (this.largeWorker) {
return this.largeWorker;
}
// larger files are processed in a separate process to limit memory usage in the main process
return (this.largeWorker = new JestWorker(workerFile, {
exposedMethods: ['process', 'inlineLocales'],
setupArgs: [[...serialize(this.workerOptions)]],
}));
}
private ensureSmall(): JestWorker {
if (this.smallWorker) {
return this.smallWorker;
}
// small files are processed in a limited number of threads to improve speed
// The limited number also prevents a large increase in memory usage for an otherwise short operation
return (this.smallWorker = new JestWorker(workerFile, {
exposedMethods: ['process', 'inlineLocales'],
setupArgs: hasThreadSupport ? [this.workerOptions] : [[...serialize(this.workerOptions)]],
numWorkers: os.cpus().length < 2 ? 1 : 2,
enableWorkerThreads: hasThreadSupport,
}));
}
private executeAction<O>(method: string, action: { code: string }): Promise<O> {
// code.length is not an exact byte count but close enough for this
if (action.code.length > this.sizeThreshold) {
return BundleActionExecutor.executeMethod<O>(this.ensureLarge(), method, action);
} else {
return BundleActionExecutor.executeMethod<O>(this.ensureSmall(), method, action);
}
}
async process(action: ProcessBundleOptions): Promise<ProcessBundleResult> {
if (this.cache) {
const cacheKeys = this.cache.generateCacheKeys(action);
action.cacheKeys = cacheKeys;
// Try to get cached data, if it fails fallback to processing
try {
const cachedResult = await this.cache.getCachedBundleResult(action);
if (cachedResult) {
return cachedResult;
}
} catch {}
}
return this.executeAction<ProcessBundleResult>('process', action);
}
processAll(actions: Iterable<ProcessBundleOptions>): AsyncIterable<ProcessBundleResult> {
return BundleActionExecutor.executeAll(actions, action => this.process(action));
}
async inline(
action: InlineOptions,
): Promise<{ file: string; diagnostics: { type: string; message: string }[]; count: number }> {
return this.executeAction('inlineLocales', action);
}
inlineAll(actions: Iterable<InlineOptions>) {
return BundleActionExecutor.executeAll(actions, action => this.inline(action));
}
private static async *executeAll<I, O>(
actions: Iterable<I>,
executor: (action: I) => Promise<O>,
): AsyncIterable<O> {
const executions = new Map<Promise<O>, Promise<O>>();
for (const action of actions) {
const execution = executor(action);
executions.set(
execution,
execution.then(result => {
executions.delete(execution);
return result;
}),
);
}
while (executions.size > 0) {
yield Promise.race(executions.values());
}
}
stop(): void {
if (this.largeWorker) {
this.largeWorker.end();
}
if (this.smallWorker) {
this.smallWorker.end();
}
}
}