Skip to content

Commit e3d12a1

Browse files
committedSep 1, 2020
feat(asasynciterable): add AsyncIterableTransform stream
AsyncIterableTransform allows constructing an AsyncIterableX instance from NodeJS ReadableStreams via pipe()
1 parent d53de73 commit e3d12a1

File tree

5 files changed

+134
-2
lines changed

5 files changed

+134
-2
lines changed
 

‎docs/asynciterable/converting.md

+22-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ We can then introduce IxJS by using the `fromNodeStream` which allows us then to
9797

9898
```typescript
9999
import * as fs from 'fs';
100-
import { fromNodeStream } from 'ix/asynciterable';
100+
import { fromNodeStream } from 'ix/asynciterable/fromnodestream';
101101

102102
const readable = fs.createReadStream('tmp.txt', {encoding: 'utf8'});
103103
const source = fromNodeStream(readable);
@@ -107,6 +107,27 @@ for await (const chunk in source) {
107107
}
108108
```
109109

110+
Or we can use `asAsyncIterable()` to take advantage of Node Streams' fluent `pipe` API:
111+
112+
```typescript
113+
import * as fs from 'fs';
114+
import { map } from 'ix/asynciterable/operators/map';
115+
import { flatMap } from 'ix/asynciterable/operators/flatmap';
116+
import { asAsyncIterable } from 'ix/asynciterable/asasynciterable';
117+
118+
const source = fs
119+
.createReadStream('tmp.txt', {encoding: 'utf8'})
120+
// Transform a Node stream into an AsyncIterable
121+
.pipe(asAsyncIterable({ objectMode: false }))
122+
// The result here is an AsyncIterableX
123+
.pipe(map((chunk, index) => `${index}: ${chunk}`));
124+
125+
for await (const chunk in source) {
126+
console.log(chunk);
127+
}
128+
129+
```
130+
110131
## Creating a sequence from Events
111132

112133
Although we traditionally think of events being push only such as Subject/Observer or Observables, we can also bridge to events using AsyncIterables. To do that, we have a couple of mechanisms, one called `fromEvent` which binds either your DOM `EventTarget` or Node.js `EventEmitter` to a given event. Then you can iterate over it just like any other async-iterable sequence.
+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import '../asynciterablehelpers';
2+
import { from } from 'ix/asynciterable';
3+
import { Readable, ReadableOptions } from 'stream';
4+
import { asAsyncIterable } from 'ix/asynciterable/asasynciterable';
5+
6+
// eslint-disable-next-line consistent-return
7+
(() => {
8+
if (!asAsyncIterable || process.env.TEST_NODE_STREAMS !== 'true') {
9+
return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {
10+
/**/
11+
});
12+
}
13+
14+
class Counter extends Readable {
15+
private _index: number;
16+
private _max: number;
17+
18+
constructor(options?: ReadableOptions) {
19+
super(options);
20+
this._max = 3;
21+
this._index = 0;
22+
}
23+
24+
_read() {
25+
this.push(++this._index > this._max ? null : `${this._index}`);
26+
}
27+
}
28+
29+
const compare = (a: string, b: string) => Buffer.from(a).compare(Buffer.from(b)) === 0;
30+
31+
describe('AsyncIterable#fromNodeStream', () => {
32+
test('objectMode: true', async () => {
33+
const c = new Counter({ objectMode: true });
34+
const xs = c.pipe(
35+
asAsyncIterable<string>({ objectMode: true })
36+
);
37+
const expected = from(['1', '2', '3']);
38+
await expect(xs).toEqualStream(expected, compare);
39+
});
40+
41+
test('objectMode: false', async () => {
42+
const c = new Counter({ objectMode: false });
43+
const xs = c.pipe(
44+
asAsyncIterable<string>({ objectMode: false })
45+
);
46+
const expected = from(['123']);
47+
await expect(xs).toEqualStream(expected, compare);
48+
});
49+
});
50+
})();

‎src/Ix.node.ts

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export * from './Ix';
22

33
export { IterableReadable } from './iterable/tonodestream';
44
export { AsyncIterableReadable } from './asynciterable/tonodestream';
5+
export { asAsyncIterable, AsyncIterableTransform } from './asynciterable/asasynciterable';
56
export { fromNodeStream, ReadableStreamAsyncIterable } from './asynciterable/fromnodestream';
67

78
import './add/asynciterable-operators/skipwhile';

‎src/asynciterable/asasynciterable.ts

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { AsyncIterableX } from './asynciterablex';
2+
import { OperatorAsyncFunction, UnaryFunction } from '../interfaces';
3+
import { Transform, TransformCallback } from 'stream';
4+
5+
export interface TransformOptions {
6+
// ReadableOptions/WritableOptions
7+
highWaterMark?: number;
8+
objectMode?: boolean;
9+
autoDestroy?: boolean;
10+
// ReadableOptions
11+
encoding?: string;
12+
// WritableOptions
13+
decodeStrings?: boolean;
14+
defaultEncoding?: string;
15+
emitClose?: boolean;
16+
// DuplexOptions
17+
allowHalfOpen?: boolean;
18+
readableObjectMode?: boolean;
19+
writableObjectMode?: boolean;
20+
readableHighWaterMark?: number;
21+
writableHighWaterMark?: number;
22+
writableCorked?: number;
23+
}
24+
25+
export interface AsyncIterableTransform<T> extends AsyncIterableX<T>, Transform {
26+
pipe<R>(...operations: UnaryFunction<AsyncIterable<T>, R>[]): R;
27+
pipe<R>(...operations: OperatorAsyncFunction<T, R>[]): AsyncIterableX<R>;
28+
pipe<R extends NodeJS.WritableStream>(writable: R, options?: { end?: boolean }): R;
29+
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
30+
}
31+
32+
const asyncIterableMixin = Symbol('asyncIterableMixin');
33+
34+
export class AsyncIterableTransform<T> extends Transform {
35+
private static [asyncIterableMixin] = false;
36+
constructor(options?: TransformOptions) {
37+
super(options);
38+
// If this is the first time AsyncIterableTransform is being constructed,
39+
// mixin the methods from the AsyncIterableX's prototype.
40+
if (!AsyncIterableTransform[asyncIterableMixin]) {
41+
AsyncIterableTransform[asyncIterableMixin] = true;
42+
Object.defineProperties(
43+
AsyncIterableTransform.prototype,
44+
Object.getOwnPropertyDescriptors(AsyncIterableX.prototype)
45+
);
46+
}
47+
}
48+
/** @nocollapse */
49+
_flush(callback: TransformCallback): void {
50+
callback();
51+
}
52+
/** @nocollapse */
53+
_transform(chunk: any, _encoding: string, callback: TransformCallback): void {
54+
callback(null, chunk);
55+
}
56+
}
57+
58+
export function asAsyncIterable<T>(options: TransformOptions = {}) {
59+
return new AsyncIterableTransform<T>(options);
60+
}

‎src/asynciterable/index.node.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
export * from './index';
2-
32
export * from './tonodestream';
43
export * from './fromnodestream';
4+
export * from './asasynciterable';

0 commit comments

Comments
 (0)
Please sign in to comment.