Skip to content

Commit 575c8aa

Browse files
committed
make received_messages variable local to the consuming thread
1 parent 03a853b commit 575c8aa

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

rust-stream/src/bin/receive_offset_tracking.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1111
use rabbitmq_stream_client::Environment;
1212
let environment = Environment::builder().build().await?;
1313
let stream = "stream-offset-tracking-rust";
14-
let received_messages = Arc::new(AtomicI64::new(-1));
1514
let first_offset = Arc::new(AtomicI64::new(-1));
1615
let last_offset = Arc::new(AtomicI64::new(-1));
1716
let notify_on_close = Arc::new(Notify::new());
@@ -46,7 +45,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4645

4746
let mut stored_offset: u64 = consumer.query_offset().await.unwrap_or_else(|_| 0);
4847

49-
if stored_offset > 0 {
48+
if stored_offset > 0 {
5049
stored_offset += 1;
5150
}
5251
consumer = environment
@@ -62,6 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6261
let notify_on_close_cloned = notify_on_close.clone();
6362

6463
task::spawn(async move {
64+
let mut received_messages = -1;
6565
while let Some(delivery) = consumer.next().await {
6666
let d = delivery.unwrap();
6767

@@ -74,8 +74,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7474
Ordering::Relaxed,
7575
);
7676
}
77-
78-
if received_messages.fetch_add(1, Ordering::Relaxed) % 10 == 0
77+
received_messages = received_messages + 1;
78+
if received_messages % 10 == 0
7979
|| String::from_utf8_lossy(d.message().data().unwrap()).contains("marker")
8080
{
8181
let _ = consumer
@@ -87,7 +87,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8787
let handle = consumer.handle();
8888
_ = handle.close().await;
8989
notify_on_close_cloned.notify_one();
90-
9190
}
9291
}
9392
}

0 commit comments

Comments
 (0)