Skip to content

Commit 4c4ad66

Browse files
tomerdcompnerd
andauthored
improve process state management (#209)
motivation: improve performance and robustness of process state management changes: * refactor Process to use a state machine to track the process execution state * replace use of DispatchQueue with Locks to protect state * adjust windows implementation rdar://76087764 Co-authored-by: Saleem Abdulrasool <compnerd@compnerd.org>
1 parent 8ce7964 commit 4c4ad66

File tree

2 files changed

+171
-81
lines changed

2 files changed

+171
-81
lines changed

Sources/TSCBasic/Process.swift

Lines changed: 170 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,15 @@ public final class Process: ObjectIdentifierProtocol {
178178
}
179179
}
180180

181+
// process execution mutable state
182+
private enum State {
183+
case idle
184+
case readingOutputThread(stdout: Thread, stderr: Thread?)
185+
case readingOutputPipe(sync: DispatchGroup)
186+
case outputReady(stdout: Result<[UInt8], Swift.Error>, stderr: Result<[UInt8], Swift.Error>)
187+
case complete(ProcessResult)
188+
}
189+
181190
/// Typealias for process id type.
182191
#if !os(Windows)
183192
public typealias ProcessID = pid_t
@@ -219,36 +228,36 @@ public final class Process: ObjectIdentifierProtocol {
219228
public private(set) var processID = ProcessID()
220229
#endif
221230

222-
/// If the subprocess has launched.
223-
/// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be
224-
/// called only once.
225-
public private(set) var launched = false
231+
// process execution mutable state
232+
private var state: State = .idle
233+
private let stateLock = Lock()
226234

227235
/// The result of the process execution. Available after process is terminated.
236+
/// This will block while the process is awaiting result
237+
@available(*, deprecated, message: "use waitUntilExit instead")
228238
public var result: ProcessResult? {
229-
return self.serialQueue.sync {
230-
self._result
239+
return self.stateLock.withLock {
240+
switch self.state {
241+
case .complete(let result):
242+
return result
243+
default:
244+
return nil
245+
}
231246
}
232247
}
233248

234-
/// How process redirects its output.
235-
public let outputRedirection: OutputRedirection
249+
// ideally we would use the state for this, but we need to access it while the waitForExit is locking state
250+
private var _launched = false
251+
private let launchedLock = Lock()
236252

237-
/// The result of the process execution. Available after process is terminated.
238-
private var _result: ProcessResult?
239-
240-
/// If redirected, stdout result and reference to the thread reading the output.
241-
private var stdout: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)
242-
243-
/// If redirected, stderr result and reference to the thread reading the output.
244-
private var stderr: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil)
245-
246-
/// Queue to protect concurrent reads.
247-
private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process")
253+
public var launched: Bool {
254+
return self.launchedLock.withLock {
255+
return self._launched
256+
}
257+
}
248258

249-
/// Queue to protect reading/writing on map of validated executables.
250-
private static let executablesQueue = DispatchQueue(
251-
label: "org.swift.swiftpm.process.findExecutable")
259+
/// How process redirects its output.
260+
public let outputRedirection: OutputRedirection
252261

253262
/// Indicates if a new progress group is created for the child process.
254263
private let startNewProcessGroup: Bool
@@ -257,7 +266,8 @@ public final class Process: ObjectIdentifierProtocol {
257266
///
258267
/// Key: Executable name or path.
259268
/// Value: Path to the executable, if found.
260-
static private var validatedExecutablesMap = [String: AbsolutePath?]()
269+
private static var validatedExecutablesMap = [String: AbsolutePath?]()
270+
private static let validatedExecutablesMapLock = Lock()
261271

262272
/// Create a new process instance.
263273
///
@@ -348,7 +358,7 @@ public final class Process: ObjectIdentifierProtocol {
348358
}
349359
// This should cover the most common cases, i.e. when the cache is most helpful.
350360
if workingDirectory == localFileSystem.currentWorkingDirectory {
351-
return Process.executablesQueue.sync {
361+
return Process.validatedExecutablesMapLock.withLock {
352362
if let value = Process.validatedExecutablesMap[program] {
353363
return value
354364
}
@@ -367,10 +377,11 @@ public final class Process: ObjectIdentifierProtocol {
367377
@discardableResult
368378
public func launch() throws -> WritableByteStream {
369379
precondition(arguments.count > 0 && !arguments[0].isEmpty, "Need at least one argument to launch the process.")
370-
precondition(!launched, "It is not allowed to launch the same process object again.")
371380

372-
// Set the launch bool to true.
373-
launched = true
381+
self.launchedLock.withLock {
382+
precondition(!self._launched, "It is not allowed to launch the same process object again.")
383+
self._launched = true
384+
}
374385

375386
// Print the arguments if we are verbose.
376387
if self.verbose {
@@ -393,30 +404,69 @@ public final class Process: ObjectIdentifierProtocol {
393404
let stdinPipe = Pipe()
394405
_process?.standardInput = stdinPipe
395406

407+
let group = DispatchGroup()
408+
409+
var stdout: [UInt8] = []
410+
let stdoutLock = Lock()
411+
412+
var stderr: [UInt8] = []
413+
let stderrLock = Lock()
414+
396415
if outputRedirection.redirectsOutput {
397416
let stdoutPipe = Pipe()
398417
let stderrPipe = Pipe()
418+
419+
group.enter()
399420
stdoutPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
400-
let contents = fh.readDataToEndOfFile()
401-
self.outputRedirection.outputClosures?.stdoutClosure([UInt8](contents))
402-
if case .success(let data) = self.stdout.result {
403-
self.stdout.result = .success(data + contents)
421+
let data = fh.availableData
422+
if (data.count == 0) {
423+
stdoutPipe.fileHandleForReading.readabilityHandler = nil
424+
group.leave()
425+
} else {
426+
let contents = data.withUnsafeBytes { Array<UInt8>($0) }
427+
self.outputRedirection.outputClosures?.stdoutClosure(contents)
428+
stdoutLock.withLock {
429+
stdout += contents
430+
}
404431
}
405432
}
433+
434+
group.enter()
406435
stderrPipe.fileHandleForReading.readabilityHandler = { (fh : FileHandle) -> Void in
407-
let contents = fh.readDataToEndOfFile()
408-
self.outputRedirection.outputClosures?.stderrClosure([UInt8](contents))
409-
if case .success(let data) = self.stderr.result {
410-
self.stderr.result = .success(data + contents)
436+
let data = fh.availableData
437+
if (data.count == 0) {
438+
stderrPipe.fileHandleForReading.readabilityHandler = nil
439+
group.leave()
440+
} else {
441+
let contents = data.withUnsafeBytes { Array<UInt8>($0) }
442+
self.outputRedirection.outputClosures?.stderrClosure(contents)
443+
stderrLock.withLock {
444+
stderr += contents
445+
}
411446
}
412447
}
448+
413449
_process?.standardOutput = stdoutPipe
414450
_process?.standardError = stderrPipe
415451
}
416452

453+
// first set state then start reading threads
454+
let sync = DispatchGroup()
455+
sync.enter()
456+
self.stateLock.withLock {
457+
self.state = .readingOutputPipe(sync: sync)
458+
}
459+
460+
group.notify(queue: .global()) {
461+
self.stateLock.withLock {
462+
self.state = .outputReady(stdout: .success(stdout), stderr: .success(stderr))
463+
}
464+
sync.leave()
465+
}
466+
417467
try _process?.run()
418468
return stdinPipe.fileHandleForWriting
419-
#else
469+
#else
420470
// Initialize the spawn attributes.
421471
#if canImport(Darwin) || os(Android)
422472
var attributes: posix_spawnattr_t? = nil
@@ -547,72 +597,112 @@ public final class Process: ObjectIdentifierProtocol {
547597
// Close the local read end of the input pipe.
548598
try close(fd: stdinPipe[0])
549599

550-
if outputRedirection.redirectsOutput {
600+
if !outputRedirection.redirectsOutput {
601+
// no stdout or stderr in this case
602+
self.stateLock.withLock {
603+
self.state = .outputReady(stdout: .success([]), stderr: .success([]))
604+
}
605+
} else {
606+
var pending: Result<[UInt8], Swift.Error>?
607+
let pendingLock = Lock()
608+
551609
let outputClosures = outputRedirection.outputClosures
552610

553611
// Close the local write end of the output pipe.
554612
try close(fd: outputPipe[1])
555613

556614
// Create a thread and start reading the output on it.
557-
var thread = Thread { [weak self] in
615+
let stdoutThread = Thread { [weak self] in
558616
if let readResult = self?.readOutput(onFD: outputPipe[0], outputClosure: outputClosures?.stdoutClosure) {
559-
self?.stdout.result = readResult
617+
pendingLock.withLock {
618+
if let stderrResult = pending {
619+
self?.stateLock.withLock {
620+
self?.state = .outputReady(stdout: readResult, stderr: stderrResult)
621+
}
622+
} else {
623+
pending = readResult
624+
}
625+
}
626+
} else if let stderrResult = (pendingLock.withLock { pending }) {
627+
// TODO: this is more of an error
628+
self?.stateLock.withLock {
629+
self?.state = .outputReady(stdout: .success([]), stderr: stderrResult)
630+
}
560631
}
561632
}
562-
thread.start()
563-
self.stdout.thread = thread
564633

565634
// Only schedule a thread for stderr if no redirect was requested.
635+
var stderrThread: Thread? = nil
566636
if !outputRedirection.redirectStderr {
567637
// Close the local write end of the stderr pipe.
568638
try close(fd: stderrPipe[1])
569639

570640
// Create a thread and start reading the stderr output on it.
571-
thread = Thread { [weak self] in
641+
stderrThread = Thread { [weak self] in
572642
if let readResult = self?.readOutput(onFD: stderrPipe[0], outputClosure: outputClosures?.stderrClosure) {
573-
self?.stderr.result = readResult
643+
pendingLock.withLock {
644+
if let stdoutResult = pending {
645+
self?.stateLock.withLock {
646+
self?.state = .outputReady(stdout: stdoutResult, stderr: readResult)
647+
}
648+
} else {
649+
pending = readResult
650+
}
651+
}
652+
} else if let stdoutResult = (pendingLock.withLock { pending }) {
653+
// TODO: this is more of an error
654+
self?.stateLock.withLock {
655+
self?.state = .outputReady(stdout: stdoutResult, stderr: .success([]))
656+
}
574657
}
575658
}
576-
thread.start()
577-
self.stderr.thread = thread
659+
} else {
660+
pendingLock.withLock {
661+
pending = .success([]) // no stderr in this case
662+
}
663+
}
664+
// first set state then start reading threads
665+
self.stateLock.withLock {
666+
self.state = .readingOutputThread(stdout: stdoutThread, stderr: stderrThread)
578667
}
668+
stdoutThread.start()
669+
stderrThread?.start()
579670
}
671+
580672
return stdinStream
581-
#endif // POSIX implementation
673+
#endif // POSIX implementation
582674
}
583675

584676
/// Blocks the calling process until the subprocess finishes execution.
585677
@discardableResult
586678
public func waitUntilExit() throws -> ProcessResult {
587-
#if os(Windows)
588-
precondition(_process != nil, "The process is not yet launched.")
589-
let p = _process!
590-
p.waitUntilExit()
591-
stdout.thread?.join()
592-
stderr.thread?.join()
593-
594-
let executionResult = ProcessResult(
595-
arguments: arguments,
596-
environment: environment,
597-
exitStatusCode: p.terminationStatus,
598-
output: stdout.result,
599-
stderrOutput: stderr.result
600-
)
601-
return executionResult
602-
#else
603-
return try serialQueue.sync {
604-
precondition(launched, "The process is not yet launched.")
605-
606-
// If the process has already finsihed, return it.
607-
if let existingResult = _result {
608-
return existingResult
609-
}
610-
679+
self.stateLock.lock()
680+
switch self.state {
681+
case .idle:
682+
defer { self.stateLock.unlock() }
683+
preconditionFailure("The process is not yet launched.")
684+
case .complete(let result):
685+
defer { self.stateLock.unlock() }
686+
return result
687+
case .readingOutputThread(let stdoutThread, let stderrThread):
688+
self.stateLock.unlock() // unlock early since output read thread need to change state
611689
// If we're reading output, make sure that is finished.
612-
stdout.thread?.join()
613-
stderr.thread?.join()
614-
690+
stdoutThread.join()
691+
stderrThread?.join()
692+
return try self.waitUntilExit()
693+
case .readingOutputPipe(let sync):
694+
self.stateLock.unlock() // unlock early since output read thread need to change state
695+
sync.wait()
696+
return try self.waitUntilExit()
697+
case .outputReady(let stdoutResult, let stderrResult):
698+
defer { self.stateLock.unlock() }
615699
// Wait until process finishes execution.
700+
#if os(Windows)
701+
precondition(_process != nil, "The process is not yet launched.")
702+
let p = _process!
703+
p.waitUntilExit()
704+
let exitStatusCode = p.terminationStatus
705+
#else
616706
var exitStatusCode: Int32 = 0
617707
var result = waitpid(processID, &exitStatusCode, 0)
618708
while result == -1 && errno == EINTR {
@@ -621,19 +711,19 @@ public final class Process: ObjectIdentifierProtocol {
621711
if result == -1 {
622712
throw SystemError.waitpid(errno)
623713
}
714+
#endif
624715

625716
// Construct the result.
626717
let executionResult = ProcessResult(
627718
arguments: arguments,
628719
environment: environment,
629720
exitStatusCode: exitStatusCode,
630-
output: stdout.result,
631-
stderrOutput: stderr.result
721+
output: stdoutResult,
722+
stderrOutput: stderrResult
632723
)
633-
self._result = executionResult
724+
self.state = .complete(executionResult)
634725
return executionResult
635726
}
636-
#endif
637727
}
638728

639729
#if !os(Windows)
@@ -687,12 +777,12 @@ public final class Process: ObjectIdentifierProtocol {
687777
public func signal(_ signal: Int32) {
688778
#if os(Windows)
689779
if signal == SIGINT {
690-
_process?.interrupt()
780+
_process?.interrupt()
691781
} else {
692-
_process?.terminate()
782+
_process?.terminate()
693783
}
694784
#else
695-
assert(launched, "The process is not yet launched.")
785+
assert(self.launched, "The process is not yet launched.")
696786
_ = TSCLibc.kill(startNewProcessGroup ? -processID : processID, signal)
697787
#endif
698788
}

Tests/TSCBasicTests/ProcessSetTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ProcessSetTests: XCTestCase {
5555
threadStartCondition.signal()
5656
}
5757
let result = try process.waitUntilExit()
58-
// Ensure we did termiated due to signal.
58+
// Ensure we did terminated due to signal.
5959
switch result.exitStatus {
6060
case .signalled: break
6161
default: XCTFail("Expected to exit via signal")

0 commit comments

Comments
 (0)