diff --git a/src/Middleware/LimitConcurrentRequestsMiddleware.php b/src/Middleware/LimitConcurrentRequestsMiddleware.php index 827ce21d..4831fa2c 100644 --- a/src/Middleware/LimitConcurrentRequestsMiddleware.php +++ b/src/Middleware/LimitConcurrentRequestsMiddleware.php @@ -143,21 +143,39 @@ public function __invoke(ServerRequestInterface $request, $next) $queue[$id] = $deferred; $pending = &$this->pending; - return $this->await($deferred->promise()->then(function () use ($request, $next, $body, &$pending) { + $that = $this; + return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) { // invoke next request handler ++$pending; - $ret = $next($request); + + try { + $response = $next($request); + } catch (\Exception $e) { + $that->processQueue(); + throw $e; + } catch (\Throwable $e) { // @codeCoverageIgnoreStart + // handle Errors just like Exceptions (PHP 7+ only) + $that->processQueue(); + throw $e; // @codeCoverageIgnoreEnd + } // resume readable stream and replay buffered events if ($body instanceof PauseBufferStream) { $body->resumeImplicit(); } - return $ret; - })); + // if the next handler returns a pending promise, we have to + // await its resolution before invoking next queued request + return $that->await(Promise\resolve($response)); + }); } - private function await(PromiseInterface $promise) + /** + * @internal + * @param PromiseInterface $promise + * @return PromiseInterface + */ + public function await(PromiseInterface $promise) { $that = $this; diff --git a/tests/Middleware/LimitConcurrentRequestsMiddlewareTest.php b/tests/Middleware/LimitConcurrentRequestsMiddlewareTest.php index 98fe1b9a..859b82e7 100644 --- a/tests/Middleware/LimitConcurrentRequestsMiddlewareTest.php +++ b/tests/Middleware/LimitConcurrentRequestsMiddlewareTest.php @@ -296,6 +296,31 @@ public function testReceivesNextRequestAfterPreviousHandlerIsSettled() $middleware($request, $this->expectCallableOnceWith($request)); } + public function testReceivesNextRequestWhichThrowsAfterPreviousHandlerIsSettled() + { + $request = new ServerRequest( + 'POST', + 'http://example.com/', + array(), + 'hello' + ); + + $deferred = new Deferred(); + $middleware = new LimitConcurrentRequestsMiddleware(1); + $middleware($request, function () use ($deferred) { + return $deferred->promise(); + }); + + $second = $middleware($request, function () { + throw new \RuntimeException(); + }); + + $this->assertTrue($second instanceof PromiseInterface); + $second->then(null, $this->expectCallableOnce()); + + $deferred->reject(new \RuntimeException()); + } + public function testPendingRequestCanBeCancelledAndForwardsCancellationToInnerPromise() { $request = new ServerRequest( @@ -365,6 +390,50 @@ public function testReceivesNextRequestAfterPreviousHandlerIsCancelled() $middleware($request, $this->expectCallableOnceWith($request)); } + public function testRejectsWhenQueuedPromiseIsCancelled() + { + $request = new ServerRequest( + 'POST', + 'http://example.com/', + array(), + 'hello' + ); + + $deferred = new Deferred(); + $middleware = new LimitConcurrentRequestsMiddleware(1); + $first = $middleware($request, function () use ($deferred) { + return $deferred->promise(); + }); + + $second = $middleware($request, $this->expectCallableNever()); + + $this->assertTrue($second instanceof PromiseInterface); + $second->cancel(); + $second->then(null, $this->expectCallableOnce()); + } + + public function testDoesNotInvokeNextHandlersWhenQueuedPromiseIsCancelled() + { + $request = new ServerRequest( + 'POST', + 'http://example.com/', + array(), + 'hello' + ); + + $deferred = new Deferred(); + $middleware = new LimitConcurrentRequestsMiddleware(1); + $first = $middleware($request, function () use ($deferred) { + return $deferred->promise(); + }); + + $second = $middleware($request, $this->expectCallableNever()); + /* $third = */ $middleware($request, $this->expectCallableNever()); + + $this->assertTrue($second instanceof PromiseInterface); + $second->cancel(); + } + public function testReceivesStreamingBodyChangesInstanceWithCustomBodyButSameDataWhenDequeued() { $stream = new ThroughStream();