@@ -1072,10 +1072,7 @@ impl<N: Network> Reading for Gateway<N> {
1072
1072
type Message = Event < N > ;
1073
1073
1074
1074
/// The maximum queue depth of incoming messages for a single peer.
1075
- const MESSAGE_QUEUE_DEPTH : usize = 2
1076
- * BatchHeader :: < N > :: MAX_GC_ROUNDS
1077
- * N :: LATEST_MAX_CERTIFICATES ( ) . unwrap ( ) as usize
1078
- * BatchHeader :: < N > :: MAX_TRANSMISSIONS_PER_BATCH ;
1075
+ const MESSAGE_QUEUE_DEPTH : usize = 256_000 ;
1079
1076
1080
1077
/// Creates a [`Decoder`] used to interpret messages from the network.
1081
1078
/// The `side` param indicates the connection side **from the node's perspective**.
@@ -1107,10 +1104,7 @@ impl<N: Network> Writing for Gateway<N> {
1107
1104
type Message = Event < N > ;
1108
1105
1109
1106
/// The maximum queue depth of outgoing messages for a single peer.
1110
- const MESSAGE_QUEUE_DEPTH : usize = 2
1111
- * BatchHeader :: < N > :: MAX_GC_ROUNDS
1112
- * N :: LATEST_MAX_CERTIFICATES ( ) . unwrap ( ) as usize
1113
- * BatchHeader :: < N > :: MAX_TRANSMISSIONS_PER_BATCH ;
1107
+ const MESSAGE_QUEUE_DEPTH : usize = 256_000 ;
1114
1108
1115
1109
/// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1116
1110
/// The `side` parameter indicates the connection side **from the node's perspective**.
@@ -1254,7 +1248,7 @@ impl<N: Network> Gateway<N> {
1254
1248
peer_ip : Option < SocketAddr > ,
1255
1249
restrictions_id : Field < N > ,
1256
1250
stream : & ' a mut TcpStream ,
1257
- ) -> io:: Result < ( SocketAddr , Framed < & mut TcpStream , EventCodec < N > > ) > {
1251
+ ) -> io:: Result < ( SocketAddr , Framed < & ' a mut TcpStream , EventCodec < N > > ) > {
1258
1252
// This value is immediately guaranteed to be present, so it can be unwrapped.
1259
1253
let peer_ip = peer_ip. unwrap ( ) ;
1260
1254
@@ -1319,7 +1313,7 @@ impl<N: Network> Gateway<N> {
1319
1313
peer_ip : & mut Option < SocketAddr > ,
1320
1314
restrictions_id : Field < N > ,
1321
1315
stream : & ' a mut TcpStream ,
1322
- ) -> io:: Result < ( SocketAddr , Framed < & mut TcpStream , EventCodec < N > > ) > {
1316
+ ) -> io:: Result < ( SocketAddr , Framed < & ' a mut TcpStream , EventCodec < N > > ) > {
1323
1317
// Construct the stream.
1324
1318
let mut framed = Framed :: new ( stream, EventCodec :: < N > :: handshake ( ) ) ;
1325
1319
@@ -1453,7 +1447,10 @@ mod prop_tests {
1453
1447
use snarkos_account:: Account ;
1454
1448
use snarkos_node_bft_ledger_service:: MockLedgerService ;
1455
1449
use snarkos_node_bft_storage_service:: BFTMemoryService ;
1456
- use snarkos_node_tcp:: P2P ;
1450
+ use snarkos_node_tcp:: {
1451
+ P2P ,
1452
+ protocols:: { Reading , Writing } ,
1453
+ } ;
1457
1454
use snarkvm:: {
1458
1455
ledger:: {
1459
1456
committee:: {
@@ -1692,4 +1689,15 @@ mod prop_tests {
1692
1689
}
1693
1690
}
1694
1691
}
1692
+
1693
+ #[ test]
1694
+ fn ensure_sufficient_rw_queue_depth ( ) {
1695
+ let desired_rw_queue_depth = 2
1696
+ * BatchHeader :: < MainnetV0 > :: MAX_GC_ROUNDS
1697
+ * MainnetV0 :: LATEST_MAX_CERTIFICATES ( ) . unwrap ( ) as usize
1698
+ * BatchHeader :: < MainnetV0 > :: MAX_TRANSMISSIONS_PER_BATCH ;
1699
+
1700
+ assert ! ( <Gateway <MainnetV0 > as Reading >:: MESSAGE_QUEUE_DEPTH >= desired_rw_queue_depth) ;
1701
+ assert ! ( <Gateway <MainnetV0 > as Writing >:: MESSAGE_QUEUE_DEPTH >= desired_rw_queue_depth) ;
1702
+ }
1695
1703
}
0 commit comments