@@ -10,7 +10,7 @@ use tokio::task;
1010async fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
1111 use rabbitmq_stream_client:: Environment ;
1212 let environment = Environment :: builder ( ) . build ( ) . await ?;
13- let stream = "stream-offset-tracking-rust " ;
13+ let stream = "pippo " ;
1414 let first_offset = Arc :: new ( AtomicI64 :: new ( -1 ) ) ;
1515 let last_offset = Arc :: new ( AtomicI64 :: new ( -1 ) ) ;
1616 let notify_on_close = Arc :: new ( Notify :: new ( ) ) ;
@@ -33,28 +33,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3333 }
3434 }
3535
36+ let stored_offset: u64 = 45 ;
3637 let mut consumer = environment
3738 . consumer ( )
3839 . name ( "consumer-1" )
39- . offset ( OffsetSpecification :: First )
40+ . offset ( OffsetSpecification :: Offset ( stored_offset ) )
4041 . build ( stream)
4142 . await
4243 . unwrap ( ) ;
4344
4445 println ! ( "Started consuming" ) ;
4546
46- let mut stored_offset: u64 = consumer. query_offset ( ) . await . unwrap_or_else ( |_| 0 ) ;
47+ /* let mut stored_offset: u64 = consumer.query_offset().await.unwrap_or_else(|_| 0);
4748
4849 if stored_offset > 0 {
4950 stored_offset += 1;
5051 }
5152 consumer = environment
5253 .consumer()
5354 .name("consumer-1")
54- . offset ( OffsetSpecification :: Offset ( stored_offset ) )
55+ .offset(OffsetSpecification::Offset(42 ))
5556 .build(stream)
5657 .await
57- . unwrap ( ) ;
58+ .unwrap();*/
5859
5960 let first_cloned_offset = first_offset. clone ( ) ;
6061 let last_cloned_offset = last_offset. clone ( ) ;
@@ -65,6 +66,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6566 while let Some ( delivery) = consumer. next ( ) . await {
6667 let d = delivery. unwrap ( ) ;
6768
69+ println ! ( "offset {} " , d. offset( ) ) ;
6870 if first_offset. load ( Ordering :: Relaxed ) == -1 {
6971 println ! ( "First message received" ) ;
7072 _ = first_offset. compare_exchange (
@@ -74,19 +76,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7476 Ordering :: Relaxed ,
7577 ) ;
7678 }
77- received_messages = received_messages + 1 ;
78- if received_messages % 10 == 0
79- || String :: from_utf8_lossy ( d. message ( ) . data ( ) . unwrap ( ) ) . contains ( "marker" )
79+ //received_messages = received_messages + 1;
80+ if String :: from_utf8_lossy ( d. message ( ) . data ( ) . unwrap ( ) ) . contains ( "marker" )
8081 {
81- let _ = consumer
82+ /* let _ = consumer
8283 .store_offset(d.offset())
8384 .await
84- . unwrap_or_else ( |e| println ! ( "Err: {}" , e) ) ;
85+ .unwrap_or_else(|e| println!("Err: {}", e));*/
8586 if String :: from_utf8_lossy ( d. message ( ) . data ( ) . unwrap ( ) ) . contains ( "marker" ) {
8687 last_offset. store ( d. offset ( ) as i64 , Ordering :: Relaxed ) ;
8788 let handle = consumer. handle ( ) ;
8889 _ = handle. close ( ) . await ;
8990 notify_on_close_cloned. notify_one ( ) ;
91+ break ;
9092 }
9193 }
9294 }
0 commit comments