-
Notifications
You must be signed in to change notification settings - Fork 12k
/
Copy pathmonitored-process.ts
137 lines (117 loc) · 4.48 KB
/
monitored-process.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import { SpawnOptions, spawn } from 'child_process';
import pidusage from 'pidusage';
import { Observable, Subject, from, timer } from 'rxjs';
import { concatMap, map, onErrorResumeNext, tap } from 'rxjs/operators';
import { Command } from './command';
import { AggregatedProcessStats, MonitoredProcess } from './interfaces';
const pidtree = require('pidtree');
const treeKill = require('tree-kill');
// Cleanup when the parent process exits.
const defaultProcessExitCb = () => {};
let processExitCb = defaultProcessExitCb;
process.on('exit', () => {
processExitCb();
processExitCb = defaultProcessExitCb;
});
export class LocalMonitoredProcess implements MonitoredProcess {
private stats = new Subject<AggregatedProcessStats>();
private stdout = new Subject<Buffer>();
private stderr = new Subject<Buffer>();
private pollingRate = 100;
private elapsedTimer = 0;
stats$: Observable<AggregatedProcessStats> = this.stats.asObservable();
stdout$: Observable<Buffer> = this.stdout.asObservable();
stderr$: Observable<Buffer> = this.stderr.asObservable();
constructor(private command: Command, private useProcessTime = true) {}
run(): Observable<number> {
return new Observable((obs) => {
const { cmd, cwd, args } = this.command;
const spawnOptions: SpawnOptions = { cwd, shell: true };
if (!this.useProcessTime) {
this.resetElapsedTimer();
}
// Spawn the process.
const childProcess = spawn(cmd, args, spawnOptions);
// Emit output and stats.
childProcess.stdout?.on('data', (data: Buffer) => this.stdout.next(data));
childProcess.stderr?.on('data', (data: Buffer) => this.stderr.next(data));
const statsSubs = timer(0, this.pollingRate)
.pipe(
concatMap(() => from(pidtree(childProcess.pid, { root: true }) as Promise<number[]>)),
concatMap((pids: number[]) => from(pidusage(pids, { maxage: 5 * this.pollingRate }))),
map((statsByProcess) => {
// Ignore the spawned shell in the total process number.
const pids = Object.keys(statsByProcess).filter(
(pid) => pid != childProcess.pid.toString(),
);
const processes = pids.length;
// We want most stats from the parent process.
const { pid, ppid, ctime, elapsed, timestamp } = statsByProcess[childProcess.pid];
// CPU and memory should be agreggated.
let cpu = 0;
let memory = 0;
for (const pid of pids) {
cpu += statsByProcess[pid].cpu;
memory += statsByProcess[pid].memory;
}
const stats: AggregatedProcessStats = {
processes,
cpu,
memory,
pid,
ppid,
ctime,
elapsed: this.useProcessTime ? elapsed : Date.now() - this.elapsedTimer,
timestamp,
};
return stats;
}),
tap((stats) => this.stats.next(stats)),
onErrorResumeNext(),
)
.subscribe();
// Process event handling.
// Killing processes cross platform can be hard, treeKill helps.
const killChildProcess = () => {
if (childProcess && childProcess.pid) {
treeKill(childProcess.pid, 'SIGTERM');
}
};
// Convert process exit codes and errors into observable events.
const handleChildProcessExit = (code?: number, error?: Error) => {
// Stop gathering stats and complete subjects.
statsSubs.unsubscribe();
this.stats.complete();
this.stdout.complete();
this.stderr.complete();
// Kill hanging child processes and emit error/exit code.
killChildProcess();
if (error) {
obs.error(error);
}
obs.next(code);
obs.complete();
};
childProcess.once('exit', handleChildProcessExit);
childProcess.once('error', (err) => handleChildProcessExit(1, err));
processExitCb = killChildProcess;
// Cleanup on unsubscription.
return killChildProcess;
});
}
resetElapsedTimer() {
if (this.useProcessTime) {
throw new Error(
`Cannot reset elapsed timer when using process time. Set 'useProcessTime' to false.`,
);
}
this.elapsedTimer = Date.now();
}
}