Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"modern-errors": "^7.0.0",
"modern-errors-bugs": "^5.0.0",
"p-map": "^7.0.0",
"p-queue": "^8.1.0",
"p-timeout": "^6.1.2",
"path-exists": "^5.0.0",
"path-type": "^6.0.0",
Expand Down
67 changes: 37 additions & 30 deletions src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Duplex } from 'node:stream';

import pMap from 'p-map';
import pQueue from 'p-queue';
import pTimeout from 'p-timeout';
import type { Logger } from 'winston';

Expand Down Expand Up @@ -39,26 +40,22 @@ export enum Strategy {
}

class TableResolverStream extends Duplex {
queue: unknown[] = [];

constructor() {
super({ objectMode: true });
}

_read() {
while (this.queue.length > 0) {
this.push(this.queue.shift());
}
if (this.writableEnded) {
// end readable stream if writable stream has ended
this.push(null);
}
}
_read() {}

_write(chunk: unknown, _: string, next: (error?: Error | null) => void) {
this.queue.push(chunk);
this.emit('data', chunk);
next();
}

end(callback?: () => void): this {
this.emit('end');
callback?.();
return this;
}
}

const validateResource = (resource: Resource) => {
Expand Down Expand Up @@ -92,20 +89,8 @@ const resolveTable = async (
) => {
logger.info(`resolving table ${table.name}`);
const stream = new TableResolverStream();
try {
await table.resolver(client, parent, stream);
} catch (error) {
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
cause: error,
props: { table, client },
});
logger.error(`error resolving table ${table.name}`, tableError);
return;
} finally {
stream.end();
}

for await (const data of stream) {
const processData = async (data: unknown) => {
logger.debug(`resolving resource for table ${table.name}`);
const resolveResourceTimeout = 10 * 60 * 1000;
const resource = new Resource(table, parent, data);
Expand All @@ -118,7 +103,7 @@ const resolveTable = async (
props: { resource, table, client },
});
logger.error(preResolverError);
continue;
return;
}

try {
Expand All @@ -128,7 +113,7 @@ const resolveTable = async (
await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout });
} catch (error) {
logger.error(`error resolving columns for table ${table.name}`, error);
continue;
return;
}

try {
Expand All @@ -139,7 +124,7 @@ const resolveTable = async (
props: { resource, table, client },
});
logger.error(postResolveError);
continue;
return;
}

setCQId(resource, deterministicCQId);
Expand All @@ -148,7 +133,7 @@ const resolveTable = async (
validateResource(resource);
} catch (error) {
logger.error(error);
continue;
return;
}

try {
Expand All @@ -161,14 +146,36 @@ const resolveTable = async (
},
});
logger.error(encodeError);
continue;
return;
}

logger.debug(`done resolving resource for table ${table.name}`);

await pMap(table.relations, (child) =>
resolveTable(logger, client, child, resource, syncStream, deterministicCQId),
);
};

const queue = new pQueue({ concurrency: 5 });

stream.on('data', async (data) => {
await queue.add(() => processData(data));
});

const resolverPromise = table.resolver(client, parent, stream);

try {
await resolverPromise;
} catch (error) {
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
cause: error,
props: { table, client },
});
logger.error(`error resolving table ${table.name}`, tableError);
return;
} finally {
stream.end();
await queue.onIdle();
}

logger.info(`done resolving table ${table.name}`);
Expand Down