diff --git a/package-lock.json b/package-lock.json index bf72e39..96cc0f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "modern-errors": "^7.0.0", "modern-errors-bugs": "^5.0.0", "p-map": "^7.0.0", + "p-queue": "^8.1.0", "p-timeout": "^6.1.2", "path-exists": "^5.0.0", "path-type": "^6.0.0", @@ -3309,6 +3310,12 @@ "node": ">=0.10.0" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", + "license": "MIT" + }, "node_modules/execa": { "version": "9.5.2", "resolved": "https://registry.npmjs.org/execa/-/execa-9.5.2.tgz", @@ -5380,6 +5387,22 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.0.tgz", + "integrity": "sha512-mxLDbbGIBEXTJL0zEx8JIylaj3xQ7Z/7eEVjcF9fJX4DBiH9oqe+oahYnlKKxm0Ci9TlWTyhSHgygxMxjIB2jw==", + "license": "MIT", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-timeout": { "version": "6.1.4", "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz", diff --git a/package.json b/package.json index 5f3df3d..a0939bf 100644 --- a/package.json +++ b/package.json @@ -101,6 +101,7 @@ "modern-errors": "^7.0.0", "modern-errors-bugs": "^5.0.0", "p-map": "^7.0.0", + "p-queue": "^8.1.0", "p-timeout": "^6.1.2", "path-exists": "^5.0.0", "path-type": "^6.0.0", diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index b00b799..6e4bd79 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -1,6 +1,7 @@ import { Duplex } from 'node:stream'; import pMap from 'p-map'; +import pQueue from 'p-queue'; import pTimeout from 'p-timeout'; import type { Logger } from 'winston'; @@ -39,26 +40,22 @@ export enum Strategy { } class TableResolverStream extends Duplex { - queue: unknown[] = []; - constructor() { super({ objectMode: true }); } - _read() { - while (this.queue.length > 0) { - this.push(this.queue.shift()); - } - if (this.writableEnded) { - // end readable stream if writable stream has ended - this.push(null); - } - } + _read() {} _write(chunk: unknown, _: string, next: (error?: Error | null) => void) { - this.queue.push(chunk); + this.emit('data', chunk); next(); } + + end(callback?: () => void): this { + this.emit('end'); + callback?.(); + return this; + } } const validateResource = (resource: Resource) => { @@ -92,20 +89,8 @@ const resolveTable = async ( ) => { logger.info(`resolving table ${table.name}`); const stream = new TableResolverStream(); - try { - await table.resolver(client, parent, stream); - } catch (error) { - const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, { - cause: error, - props: { table, client }, - }); - logger.error(`error resolving table ${table.name}`, tableError); - return; - } finally { - stream.end(); - } - for await (const data of stream) { + const processData = async (data: unknown) => { logger.debug(`resolving resource for table ${table.name}`); const resolveResourceTimeout = 10 * 60 * 1000; const resource = new Resource(table, parent, data); @@ -118,7 +103,7 @@ const resolveTable = async ( props: { resource, table, client }, }); logger.error(preResolverError); - continue; + return; } try { @@ -128,7 +113,7 @@ const resolveTable = async ( await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout }); } catch (error) { logger.error(`error resolving columns for table ${table.name}`, error); - continue; + return; } try { @@ -139,7 +124,7 @@ const resolveTable = async ( props: { resource, table, client }, }); logger.error(postResolveError); - continue; + return; } setCQId(resource, deterministicCQId); @@ -148,7 +133,7 @@ const resolveTable = async ( validateResource(resource); } catch (error) { logger.error(error); - continue; + return; } try { @@ -161,7 +146,7 @@ const resolveTable = async ( }, }); logger.error(encodeError); - continue; + return; } logger.debug(`done resolving resource for table ${table.name}`); @@ -169,6 +154,28 @@ const resolveTable = async ( await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream, deterministicCQId), ); + }; + + const queue = new pQueue({ concurrency: 5 }); + + stream.on('data', async (data) => { + await queue.add(() => processData(data)); + }); + + const resolverPromise = table.resolver(client, parent, stream); + + try { + await resolverPromise; + } catch (error) { + const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, { + cause: error, + props: { table, client }, + }); + logger.error(`error resolving table ${table.name}`, tableError); + return; + } finally { + stream.end(); + await queue.onIdle(); } logger.info(`done resolving table ${table.name}`);