-
-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathtokio.rs
74 lines (65 loc) · 2.11 KB
/
tokio.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use lapin::{
message::DeliveryResult,
options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
#[tokio::main]
async fn main() {
let uri = "amqp://localhost:5672";
let options = ConnectionProperties::default()
// Use tokio executor and reactor.
// At the moment the reactor is only available for unix.
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let connection = Connection::connect(uri, options).await.unwrap();
let channel = connection.create_channel().await.unwrap();
let _queue = channel
.queue_declare(
"queue_test",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let consumer = channel
.basic_consume(
"queue_test",
"tag_foo",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
consumer.set_delegate(move |delivery: DeliveryResult| async move {
let delivery = match delivery {
// Carries the delivery alongside its channel
Ok(Some(delivery)) => delivery,
// The consumer got canceled
Ok(None) => return,
// Carries the error and is always followed by Ok(None)
Err(error) => {
dbg!("Failed to consume queue message {}", error);
return;
}
};
// Do something with the delivery data (The message payload)
delivery
.ack(BasicAckOptions::default())
.await
.expect("Failed to ack send_webhook_event message");
});
channel
.basic_publish(
"",
"queue_test",
BasicPublishOptions::default(),
b"Hello world!",
BasicProperties::default(),
)
.await
.unwrap()
.await
.unwrap();
std::future::pending::<()>().await;
}