-
-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathreconnect.rs
89 lines (74 loc) · 2.33 KB
/
reconnect.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use futures_lite::stream::StreamExt;
use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use tracing::info;
fn retry_rabbit_stuff(addr: String) {
std::thread::sleep(std::time::Duration::from_millis(2000));
tracing::debug!("Reconnecting to rabbitmq");
try_rabbit_stuff(addr);
}
fn try_rabbit_stuff(addr: String) {
async_global_executor::spawn(async move {
if let Err(err) = rabbit_stuff(addr.clone()).await {
tracing::error!("Error: {}", err);
retry_rabbit_stuff(addr);
}
})
.detach();
}
async fn rabbit_stuff(addr: String) -> Result<()> {
let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
info!("CONNECTED");
let channel_a = conn.create_channel().await?;
let channel_b = conn.create_channel().await?;
let queue = channel_a
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
info!(?queue, "Declared queue");
let mut consumer = channel_b
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
async_global_executor::spawn(async move {
info!("will consume");
while let Some(delivery) = consumer.next().await {
let delivery = delivery?;
delivery.ack(BasicAckOptions::default()).await?;
}
Ok::<(), lapin::Error>(())
})
.detach();
let payload = b"Hello world!";
loop {
let confirm = channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await?
.await?;
assert_eq!(confirm, Confirmation::NotRequested);
}
}
fn main() -> Result<()> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
tracing_subscriber::fmt::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
try_rabbit_stuff(addr);
async_global_executor::block_on(std::future::pending())
}