forked from amqp-rs/lapin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel_receiver_state.rs
121 lines (115 loc) · 3.85 KB
/
channel_receiver_state.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::{
types::{ShortString, ShortUInt},
Result,
};
use std::collections::VecDeque;
#[derive(Debug, Default)]
pub(crate) struct ChannelReceiverStates(VecDeque<ChannelReceiverState>);
impl ChannelReceiverStates {
#[cfg(test)]
pub(crate) fn receiver_state(&self) -> ChannelReceiverState {
self.0.front().unwrap().clone()
}
pub(crate) fn set_will_receive(
&mut self,
class_id: ShortUInt,
queue_name: Option<ShortString>,
request_id_or_consumer_tag: Option<ShortString>,
) {
self.0.push_back(ChannelReceiverState::WillReceiveContent(
class_id,
queue_name,
request_id_or_consumer_tag,
));
}
pub(crate) fn set_content_length<
Handler: FnOnce(&Option<ShortString>, &Option<ShortString>, bool),
OnInvalidClass: FnOnce(String) -> Result<()>,
OnError: FnOnce(String) -> Result<()>,
>(
&mut self,
channel_id: u16,
class_id: ShortUInt,
length: usize,
handler: Handler,
invalid_class_hanlder: OnInvalidClass,
error_handler: OnError,
confirm_mode: bool,
) -> Result<()> {
if let Some(ChannelReceiverState::WillReceiveContent(
expected_class_id,
queue_name,
request_id_or_consumer_tag,
)) = self.0.pop_front()
{
if expected_class_id == class_id {
handler(&queue_name, &request_id_or_consumer_tag, confirm_mode);
if length > 0 {
self.0.push_front(ChannelReceiverState::ReceivingContent(
queue_name,
request_id_or_consumer_tag,
length,
));
}
Ok(())
} else {
invalid_class_hanlder(format!(
"content header frame with class id {} instead of {} received on channel {}",
class_id, expected_class_id, channel_id
))
}
} else {
error_handler(format!(
"unexpected content header frame received on channel {}",
channel_id
))
}
}
pub(crate) fn receive<
Handler: FnOnce(&Option<ShortString>, &Option<ShortString>, usize, bool),
OnError: FnOnce(String) -> Result<()>,
>(
&mut self,
channel_id: u16,
length: usize,
handler: Handler,
error_handler: OnError,
confirm_mode: bool,
) -> Result<()> {
if let Some(ChannelReceiverState::ReceivingContent(
queue_name,
request_id_or_consumer_tag,
len,
)) = self.0.pop_front()
{
if let Some(remaining) = len.checked_sub(length) {
handler(
&queue_name,
&request_id_or_consumer_tag,
remaining,
confirm_mode,
);
if remaining > 0 {
self.0.push_front(ChannelReceiverState::ReceivingContent(
queue_name,
request_id_or_consumer_tag,
remaining,
));
}
Ok(())
} else {
error_handler(format!("unexpectedly large content body frame received on channel {} ({} ybtes, expected {} bytes)", channel_id, length, len))
}
} else {
error_handler(format!(
"unexpected content body frame received on channel {}",
channel_id
))
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum ChannelReceiverState {
WillReceiveContent(ShortUInt, Option<ShortString>, Option<ShortString>),
ReceivingContent(Option<ShortString>, Option<ShortString>, usize),
}