1
- //! Module for anonymous pipe
1
+ //! A cross-platform anonymous pipe.
2
2
//!
3
- //! ```
4
- //! #![feature(anonymous_pipe)]
3
+ //! This module provides support for anonymous OS pipes, like [pipe] on Linux or [CreatePipe] on
4
+ //! Windows, which can be used as synchronous communication channels between related processes.
5
+ //!
6
+ //! # Behavior
7
+ //!
8
+ //! A pipe can be thought of as a bounded, interprocess [`mpsc`](crate::sync::mpsc), provided by
9
+ //! the OS, with a platform-dependent capacity. In particular:
10
+ //!
11
+ //! * A read on a [`PipeReader`] blocks until the pipe is non-empty.
12
+ //! * A write on a [`PipeWriter`] blocks when the pipe is full.
13
+ //! * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`]
14
+ //! returns EOF.
15
+ //! * [`PipeReader`] can be shared (by copying the underlying file descriptor), but only one process
16
+ //! will consume data in the pipe at any given time. See [`PipeReader::try_clone`] for an example
17
+ //! of this.
18
+ //!
19
+ //! # Capacity
20
+ //!
21
+ //! Pipe capacity is platform-dependent. To quote the Linux [man page]:
22
+ //!
23
+ //! > Different implementations have different limits for the pipe capacity. Applications should
24
+ //! > not rely on a particular capacity: an application should be designed so that a reading process
25
+ //! > consumes data as soon as it is available, so that a writing process does not remain blocked.
5
26
//!
27
+ //! # Examples
28
+ //!
29
+ //! ```no_run
30
+ //! #![feature(anonymous_pipe)]
6
31
//! # #[cfg(miri)] fn main() {}
7
32
//! # #[cfg(not(miri))]
33
+ //! # use std::process::Command;
34
+ //! # use std::io::{Read, Write};
8
35
//! # fn main() -> std::io::Result<()> {
9
- //! let (reader, writer) = std::pipe::pipe()?;
36
+ //! let (mut ping_rx, ping_tx) = std::pipe::pipe()?;
37
+ //! let (pong_rx, mut pong_tx) = std::pipe::pipe()?;
38
+ //!
39
+ //! let mut peer = Command::new("python")
40
+ //! .args([
41
+ //! "-c",
42
+ //! "from os import close\n\
43
+ //! from sys import stdin, stdout\n\
44
+ //! stdout.write('ping')\n\
45
+ //! stdout.flush()\n\
46
+ //! close(stdout.fileno())\n\
47
+ //! msg = stdin.read()\n\
48
+ //! assert(msg == 'pong')"
49
+ //! ])
50
+ //! .stdin(pong_rx)
51
+ //! .stdout(ping_tx)
52
+ //! .spawn()?;
53
+ //!
54
+ //! let mut msg = String::new();
55
+ //! // Block until peer's write end is closed.
56
+ //! ping_rx.read_to_string(&mut msg)?;
57
+ //! assert_eq!(&msg, "ping");
58
+ //!
59
+ //! pong_tx.write_all(b"pong")?;
60
+ //! // Close to unblock peer's read end.
61
+ //! drop(pong_tx);
62
+ //!
63
+ //! peer.wait()?;
10
64
//! # Ok(())
11
65
//! # }
12
66
//! ```
13
-
67
+ //! [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html
68
+ //! [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe
69
+ //! [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html
14
70
use crate :: io;
15
71
use crate :: sys:: anonymous_pipe:: { AnonPipe , pipe as pipe_inner} ;
16
72
17
73
/// Create anonymous pipe that is close-on-exec and blocking.
74
+ ///
75
+ /// # Examples
76
+ ///
77
+ /// See the [module-level](crate::pipe) documentation for examples.
18
78
#[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
19
79
#[ inline]
20
80
pub fn pipe ( ) -> io:: Result < ( PipeReader , PipeWriter ) > {
@@ -33,6 +93,61 @@ pub struct PipeWriter(pub(crate) AnonPipe);
33
93
34
94
impl PipeReader {
35
95
/// Create a new [`PipeReader`] instance that shares the same underlying file description.
96
+ ///
97
+ /// # Examples
98
+ ///
99
+ /// ```no_run
100
+ /// #![feature(anonymous_pipe)]
101
+ /// # #[cfg(miri)] fn main() {}
102
+ /// # #[cfg(not(miri))]
103
+ /// # use std::process::Command;
104
+ /// # use std::io::{Read, Write};
105
+ /// # use std::fs::{self, File};
106
+ /// # fn main() -> std::io::Result<()> {
107
+ /// const NUM_SLOT: u8 = 2;
108
+ /// const NUM_PROC: u8 = 5;
109
+ /// const OUTPUT: &str = "output.txt";
110
+ ///
111
+ /// let jobserver = [b'|'; NUM_SLOT as usize];
112
+ /// let mut jobs = vec![];
113
+ /// let mut file = File::create_new(OUTPUT)?;
114
+ ///
115
+ /// let (reader, mut writer) = std::pipe::pipe()?;
116
+ ///
117
+ /// for j in 0..NUM_PROC {
118
+ /// jobs.push(
119
+ /// Command::new("python")
120
+ /// .args([
121
+ /// "-c",
122
+ /// &format!(
123
+ /// "from sys import stdout, stdin\n\
124
+ /// stdin.read(1)\n\
125
+ /// with open('{}', 'a') as fp: fp.write('x')\n\
126
+ /// stdout.write(b'|'.decode())",
127
+ /// OUTPUT
128
+ /// ),
129
+ /// ])
130
+ /// .stdout(writer.try_clone()?)
131
+ /// .stdin(reader.try_clone()?)
132
+ /// .spawn()?,
133
+ /// )
134
+ /// }
135
+ ///
136
+ /// writer.write_all(&jobserver)?;
137
+ ///
138
+ /// for mut job in jobs {
139
+ /// job.wait()?;
140
+ /// }
141
+ ///
142
+ /// let mut buf = String::new();
143
+ /// file.read_to_string(&mut buf)?;
144
+ ///
145
+ /// fs::remove_file(OUTPUT)?;
146
+ ///
147
+ /// assert_eq!(buf, "x".repeat(NUM_PROC.into()));
148
+ /// # Ok(())
149
+ /// # }
150
+ /// ```
36
151
#[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
37
152
pub fn try_clone ( & self ) -> io:: Result < Self > {
38
153
self . 0 . try_clone ( ) . map ( Self )
@@ -41,6 +156,37 @@ impl PipeReader {
41
156
42
157
impl PipeWriter {
43
158
/// Create a new [`PipeWriter`] instance that shares the same underlying file description.
159
+ ///
160
+ /// # Examples
161
+ ///
162
+ /// ```no_run
163
+ /// #![feature(anonymous_pipe)]
164
+ /// # #[cfg(miri)] fn main() {}
165
+ /// # #[cfg(not(miri))]
166
+ /// # use std::process::Command;
167
+ /// # use std::io::Read;
168
+ /// # fn main() -> std::io::Result<()> {
169
+ /// let (mut reader, writer) = std::pipe::pipe()?;
170
+ ///
171
+ /// let mut peer = Command::new("python")
172
+ /// .args([
173
+ /// "-c",
174
+ /// "from sys import stdout, stderr\n\
175
+ /// stdout.write('foo')\n\
176
+ /// stderr.write('bar')"
177
+ /// ])
178
+ /// .stdout(writer.try_clone()?)
179
+ /// .stderr(writer)
180
+ /// .spawn()?;
181
+ ///
182
+ /// let mut msg = String::new();
183
+ /// reader.read_to_string(&mut msg)?;
184
+ /// assert_eq!(&msg, "foobar");
185
+ ///
186
+ /// peer.wait()?;
187
+ /// # Ok(())
188
+ /// # }
189
+ /// ```
44
190
#[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
45
191
pub fn try_clone ( & self ) -> io:: Result < Self > {
46
192
self . 0 . try_clone ( ) . map ( Self )
0 commit comments