forked from amqp-rs/lapin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher_confirms.rs
144 lines (130 loc) · 4.75 KB
/
publisher_confirms.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use futures_executor::LocalPool;
use lapin::{
message::{BasicReturnMessage, Delivery, DeliveryResult},
options::*,
types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use log::info;
fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
LocalPool::new().run_until(async {
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("connection error");
info!("CONNECTED");
//send channel
let channel_a = conn.create_channel().await.expect("create_channel");
//receive channel
let channel_b = conn.create_channel().await.expect("create_channel");
info!("[{}] state: {:?}", line!(), conn.status().state());
//create the hello queue
let queue = channel_a
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");
info!("[{}] state: {:?}", line!(), conn.status().state());
info!("[{}] declared queue: {:?}", line!(), queue);
channel_a
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("confirm_select");
info!("[{}] state: {:?}", line!(), conn.status().state());
info!("Enabled publisher-confirms");
info!("will consume");
channel_b
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume")
.set_delegate(move |delivery: DeliveryResult| async move {
info!("received message: {:?}", delivery);
if let Ok(Some((channel, delivery))) = delivery {
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await
.expect("basic_ack");
}
});
info!("[{}] state: {:?}", line!(), conn.status().state());
info!("will publish");
let payload = b"Hello world!";
let confirm = channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await // Wait for this specific ack/nack
.expect("publisher-confirms");
assert!(confirm.is_ack());
assert_eq!(confirm.take_message(), None);
info!("[{}] state: {:?}", line!(), conn.status().state());
for _ in 1..=2 {
channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await
.expect("basic_publish"); // Drop the PublisherConfirm instead for waiting for it ...
}
// ... and wait for all pending ack/nack afterwards instead of individually in the above loop
let returned = channel_a
.wait_for_confirms()
.await
.expect("wait for confirms");
assert!(returned.is_empty());
let confirm = channel_a
.basic_publish(
"",
"unroutable-routing-key-for-tests",
BasicPublishOptions {
mandatory: true,
..BasicPublishOptions::default()
},
payload.to_vec(),
BasicProperties::default().with_priority(42),
)
.await
.expect("basic_publish")
.await // Wait for this specific ack/nack
.expect("publisher-confirms");
assert!(confirm.is_ack());
assert_eq!(
confirm.take_message(),
Some(BasicReturnMessage {
delivery: Delivery {
delivery_tag: 0,
exchange: "".into(),
routing_key: "unroutable-routing-key-for-tests".into(),
redelivered: false,
properties: BasicProperties::default().with_priority(42),
data: payload.to_vec(),
},
reply_code: 312,
reply_text: "NO_ROUTE".into(),
})
);
let _ = channel_a;
})
}