@@ -27,12 +27,12 @@ class VhostsConsumer extends Consumer
2727{
2828 protected const MAIN_HANDLER_LOCK = 'vhost_handler ' ;
2929
30- protected const CONSUME_BATCH_SIZE = 5 ;
31-
3230 private ?OutputStyle $ output = null ;
3331
3432 private ?ConsumeVhostsFiltersDto $ filtersDto = null ;
3533
34+ private int $ batchSize = 100 ;
35+
3636 private string $ configConnectionName = '' ;
3737
3838 private string $ currentConnectionName = '' ;
@@ -91,15 +91,19 @@ public function setFiltersDto(ConsumeVhostsFiltersDto $filtersDto): self
9191 return $ this ;
9292 }
9393
94- public function daemon ($ connectionName , $ queue , WorkerOptions $ options )
94+ /**
95+ * @param int $batchSize
96+ * @return $this
97+ */
98+ public function setBatchSize (int $ batchSize ): self
9599 {
96- $ this ->loadVhosts ();
97- if (false === $ this ->switchToNextVhost ()) {
98- // @todo load vhosts again
99- $ this ->output ->warning ('No active vhosts... Exit ' );
100+ $ this ->batchSize = $ batchSize ;
101+ return $ this ;
102+ }
100103
101- return ;
102- }
104+ public function daemon ($ connectionName , $ queue , WorkerOptions $ options )
105+ {
106+ $ this ->goAheadOrWait ();
103107
104108 $ this ->configConnectionName = (string ) $ connectionName ;
105109 $ this ->workerOptions = $ options ;
@@ -259,7 +263,7 @@ private function startConsuming()
259263 $ jobsProcessed
260264 ));
261265
262- if ($ jobsProcessed >= self :: CONSUME_BATCH_SIZE ) {
266+ if ($ jobsProcessed >= $ this -> batchSize ) {
263267 $ this ->processBatch ($ connection );
264268
265269 $ this ->stopConsuming ();
0 commit comments