Skip to content

Commit 78d2f21

Browse files
authored
Merge pull request #257 from unixslayer/keep-projection-running-as-long-events-are-loading
keep projections running as long events are loading
2 parents cb553d6 + 4cc9e4f commit 78d2f21

File tree

2 files changed

+89
-52
lines changed

2 files changed

+89
-52
lines changed

src/Projection/PdoEventStoreReadModelProjector.php

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,6 @@ final class PdoEventStoreReadModelProjector implements MetadataAwareReadModelPro
132132
*/
133133
private $eventCounter = 0;
134134

135-
/**
136-
* @var int
137-
*/
138-
private $loadedEvents = 0;
139-
140135
/**
141136
* @var int
142137
*/
@@ -529,7 +524,7 @@ public function run(bool $keepRunning = true): void
529524
}
530525

531526
$streamEvents = new MergedStreamIterator(\array_keys($eventStreams), ...\array_values($eventStreams));
532-
$this->loadedEvents = $streamEvents->count();
527+
$loadedEvents = $streamEvents->count();
533528

534529
if ($singleHandler) {
535530
$gapDetected = ! $this->handleStreamWithSingleHandler($streamEvents);
@@ -586,9 +581,9 @@ public function run(bool $keepRunning = true): void
586581
}
587582

588583
$this->prepareStreamPositions();
589-
} while ($keepRunning && ! $this->isStopped);
584+
} while (($keepRunning || $loadedEvents > 0) && ! $this->isStopped);
590585
} finally {
591-
$this->releaseLock($keepRunning);
586+
$this->releaseLock($keepRunning, $loadedEvents);
592587
}
593588
}
594589

@@ -915,7 +910,7 @@ private function updateLock(): void
915910
$this->lastLockUpdate = $now;
916911
}
917912

918-
private function releaseLock(bool $keepRunning): void
913+
private function releaseLock(bool $keepRunning, int $loadedEvents): void
919914
{
920915
$projectionsTable = $this->quoteTableName($this->projectionsTable);
921916
$sql = <<<EOT
@@ -924,7 +919,7 @@ private function releaseLock(bool $keepRunning): void
924919

925920
$statement = $this->connection->prepare($sql);
926921

927-
$status = $keepRunning && $this->loadedEvents > 0 ? ProjectionStatus::RUNNING() : ProjectionStatus::IDLE();
922+
$status = ($keepRunning || $loadedEvents > 0) ? ProjectionStatus::RUNNING() : ProjectionStatus::IDLE();
928923

929924
try {
930925
$statement->execute([$status->getValue(), $this->name]);

tests/Projection/AbstractPdoEventStoreReadModelProjectorTestCase.php

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
namespace ProophTest\EventStore\Pdo\Projection;
1515

1616
use ArrayIterator;
17+
use Assert\Assertion;
1718
use DateInterval;
1819
use DateTimeImmutable;
1920
use DateTimeZone;
@@ -27,6 +28,7 @@
2728
use Prooph\EventStore\Pdo\Projection\GapDetection;
2829
use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector;
2930
use Prooph\EventStore\Pdo\Projection\PdoEventStoreReadModelProjector;
31+
use Prooph\EventStore\Projection\ProjectionStatus;
3032
use Prooph\EventStore\Projection\Projector;
3133
use Prooph\EventStore\Projection\ReadModel;
3234
use Prooph\EventStore\Stream;
@@ -456,27 +458,21 @@ public function it_detects_gap_and_performs_retry(): void
456458
$projection
457459
->fromStream('user')
458460
->init(function () {
459-
return [];
461+
return ['iteration' => 0];
460462
})
461-
->when([
462-
UserCreated::class => function (array $state, Message $event): array {
463-
return $state;
464-
},
465-
UsernameChanged::class => function (array $state, Message $event): array {
466-
return $state;
467-
},
468-
])
469-
->run(false);
470-
471-
$this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']);
472-
473-
$this->assertTrue($gapDetection->isRetrying());
463+
->whenAny(
464+
function (array $state, Message $event) use ($projectionManager, $gapDetection, $parallelConnection): array {
465+
if ($state['iteration'] === 1) {
466+
Assertion::true($gapDetection->isRetrying());
467+
$parallelConnection->commit();
468+
}
474469

475-
// Fill the gap
476-
$parallelConnection->commit();
470+
++$state['iteration'];
477471

478-
// Run again with gap detection in retry mode
479-
$projection->run(false);
472+
return $state;
473+
}
474+
)
475+
->run(false);
480476

481477
$this->assertEquals(3, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']);
482478

@@ -521,36 +517,25 @@ public function it_continues_when_retry_limit_is_reached_and_gap_not_filled(): v
521517
$projection
522518
->fromStream('user')
523519
->init(function () {
524-
return [];
520+
return ['iteration' => 0];
525521
})
526-
->when([
527-
UserCreated::class => function (array $state, Message $event): array {
528-
return $state;
529-
},
530-
UsernameChanged::class => function (array $state, Message $event): array {
531-
return $state;
532-
},
533-
])
534-
->run(false);
535-
536-
$this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']);
537-
538-
$this->assertTrue($gapDetection->isRetrying());
539-
540-
// Force a real gap
541-
$parallelConnection->rollBack();
542-
543-
// Run again with gap detection in retry mode
544-
$projection->run(false);
522+
->whenAny(
523+
function (array $state, Message $event) use ($projectionManager, $gapDetection, $parallelConnection): array {
524+
if ($state['iteration'] === 1) {
525+
Assertion::true($gapDetection->isRetrying());
526+
$parallelConnection->rollBack();
527+
}
545528

546-
// Projection should not move forward, but instead retry a second time
547-
$this->assertEquals(1, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']);
529+
++$state['iteration'];
548530

549-
// Third run with gap detection still in retry mode, but limit reached
550-
$projection->run(false);
531+
return $state;
532+
}
533+
)
534+
->run(false);
551535

552536
//Projection should have moved forward
553537
$this->assertEquals(3, $projectionManager->fetchProjectionStreamPositions('test_projection')['user']);
538+
$this->assertEquals(ProjectionStatus::IDLE(), $projectionManager->fetchProjectionStatus('test_projection'));
554539

555540
$this->assertFalse($gapDetection->isRetrying());
556541
}
@@ -632,4 +617,61 @@ protected function prepareEventStreamWithOneEvent(string $name, ?DateTimeImmutab
632617

633618
$this->eventStore->create(new Stream(new StreamName($name), new ArrayIterator($events)));
634619
}
620+
621+
#[Test]
622+
public function projection_should_run_until_end_of_stream(): void
623+
{
624+
$this->prepareEventStream('user-345');
625+
626+
$projectionManager = $this->projectionManager;
627+
$projection = $projectionManager->createReadModelProjection('test_projection', new ReadModelMock(), [
628+
Projector::OPTION_PERSIST_BLOCK_SIZE => 1,
629+
PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT => 1,
630+
]);
631+
632+
$projection
633+
->fromStream('user-345')
634+
->whenAny(function () {
635+
})
636+
->run(false);
637+
638+
$this->assertEquals(50, $projectionManager->fetchProjectionStreamPositions('test_projection')['user-345']);
639+
$this->assertEquals(ProjectionStatus::IDLE(), $projectionManager->fetchProjectionStatus('test_projection'));
640+
}
641+
642+
#[Test]
643+
public function when_failed_projection_should_release_lock_but_indicate_running_status(): void
644+
{
645+
$this->prepareEventStream('user-345');
646+
647+
$projectionManager = $this->projectionManager;
648+
$projection = $projectionManager->createReadModelProjection('test_projection', new ReadModelMock(), [
649+
Projector::OPTION_PERSIST_BLOCK_SIZE => 1,
650+
PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT => 1,
651+
]);
652+
653+
$projection
654+
->fromStream('user-345')
655+
->init(function () {
656+
return ['iteration' => 0];
657+
})
658+
->whenAny(function (array $state, Message $event): array {
659+
++$state['iteration'];
660+
661+
if ($state['iteration'] > 5) {
662+
throw new \RuntimeException('something happened');
663+
}
664+
665+
return $state;
666+
});
667+
668+
try {
669+
$projection->run(false);
670+
} catch (\Throwable) {
671+
}
672+
673+
$this->assertEquals(5, $projectionManager->fetchProjectionStreamPositions('test_projection')['user-345']);
674+
$this->assertEquals(ProjectionStatus::RUNNING(), $projectionManager->fetchProjectionStatus('test_projection'));
675+
$this->assertNull($this->connection->query("select locked_until from projections where name = 'test_projection'")->fetch(PDO::FETCH_COLUMN));
676+
}
635677
}

0 commit comments

Comments
 (0)