Skip to content

Commit 8a8a4e0

Browse files
committed
stream: readable read one buffer at a time
Instead of wasting cycles concatenating buffers, just return each one by one. Old behavior can be achieved by using `readable.read(readable.readableLength)` instead of `readable.read()`. PR: #60441
1 parent 0457bfe commit 8a8a4e0

16 files changed

+81
-46
lines changed

lib/internal/streams/readable.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,9 +636,14 @@ function howMuchToRead(n, state) {
636636
if ((state[kState] & kObjectMode) !== 0)
637637
return 1;
638638
if (NumberIsNaN(n)) {
639+
// Fast path for buffers.
640+
if ((state[kState] & kDecoder) === 0 && state.length)
641+
return state.buffer[state.bufferIndex].length;
642+
639643
// Only flow one buffer at a time.
640644
if ((state[kState] & kFlowing) !== 0 && state.length)
641645
return state.buffer[state.bufferIndex].length;
646+
642647
return state.length;
643648
}
644649
if (n <= state.length)

test/parallel/test-crypto-cipheriv-decipheriv.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ function testCipher1(key, iv) {
3131
// quite small, so there's no harm.
3232
const cStream = crypto.createCipheriv('des-ede3-cbc', key, iv);
3333
cStream.end(plaintext);
34-
ciph = cStream.read();
34+
ciph = cStream.read(cStream.readableLength);
3535

3636
const dStream = crypto.createDecipheriv('des-ede3-cbc', key, iv);
3737
dStream.end(ciph);
38-
txt = dStream.read().toString('utf8');
38+
txt = dStream.read(dStream.readableLength).toString('utf8');
3939

4040
assert.strictEqual(txt, plaintext,
4141
`streaming cipher with key ${key} and iv ${iv}`);

test/parallel/test-runner-run.mjs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as fixtures from '../common/fixtures.mjs';
33
import { join } from 'node:path';
44
import { describe, it, run } from 'node:test';
55
import { dot, spec, tap } from 'node:test/reporters';
6+
import consumers from 'node:stream/consumers';
67
import assert from 'node:assert';
78
import util from 'node:util';
89

@@ -111,34 +112,31 @@ describe('require(\'node:test\').run', { concurrency: true }, () => {
111112
describe('should be piped with spec reporter', () => {
112113
it('new spec', async () => {
113114
const specReporter = new spec();
114-
const result = await run({
115+
const result = await consumers.text(run({
115116
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
116-
}).compose(specReporter).toArray();
117-
const stringResults = result.map((bfr) => bfr.toString());
118-
assert.match(stringResults[0], /this should pass/);
119-
assert.match(stringResults[1], /tests 1/);
120-
assert.match(stringResults[1], /pass 1/);
117+
}).compose(specReporter));
118+
assert.match(result, /this should pass/);
119+
assert.match(result, /tests 1/);
120+
assert.match(result, /pass 1/);
121121
});
122122

123123
it('spec()', async () => {
124124
const specReporter = spec();
125-
const result = await run({
125+
const result = await consumers.text(run({
126126
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
127-
}).compose(specReporter).toArray();
128-
const stringResults = result.map((bfr) => bfr.toString());
129-
assert.match(stringResults[0], /this should pass/);
130-
assert.match(stringResults[1], /tests 1/);
131-
assert.match(stringResults[1], /pass 1/);
127+
}).compose(specReporter));
128+
assert.match(result, /this should pass/);
129+
assert.match(result, /tests 1/);
130+
assert.match(result, /pass 1/);
132131
});
133132

134133
it('spec', async () => {
135-
const result = await run({
134+
const result = await consumers.text(run({
136135
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
137-
}).compose(spec).toArray();
138-
const stringResults = result.map((bfr) => bfr.toString());
139-
assert.match(stringResults[0], /this should pass/);
140-
assert.match(stringResults[1], /tests 1/);
141-
assert.match(stringResults[1], /pass 1/);
136+
}).compose(spec));
137+
assert.match(result, /this should pass/);
138+
assert.match(result, /tests 1/);
139+
assert.match(result, /pass 1/);
142140
});
143141
});
144142

test/parallel/test-stream-compose.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ const assert = require('assert');
490490

491491
newStream.end();
492492

493-
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
493+
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve Rogers'), Buffer.from('On your left')]);
494494
})().then(common.mustCall());
495495
}
496496

test/parallel/test-stream-push-strings.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ms.on('readable', function() {
5959
results.push(String(chunk));
6060
});
6161

62-
const expect = [ 'first chunksecond to last chunk', 'last chunk' ];
62+
const expect = [ 'first chunk', 'second to last chunk', 'last chunk' ];
6363
process.on('exit', function() {
6464
assert.strictEqual(ms._chunks, -1);
6565
assert.deepStrictEqual(results, expect);

test/parallel/test-stream-readable-emittedReadable.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const readable = new Readable({
1010
// Initialized to false.
1111
assert.strictEqual(readable._readableState.emittedReadable, false);
1212

13-
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
13+
const expected = [Buffer.from('foo'), Buffer.from('bar'), Buffer.from('quo'), null];
1414
readable.on('readable', common.mustCall(() => {
1515
// emittedReadable should be true when the readable event is emitted
1616
assert.strictEqual(readable._readableState.emittedReadable, true);

test/parallel/test-stream-readable-infinite-read.js

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,17 @@ const readable = new Readable({
1010
highWaterMark: 16 * 1024,
1111
read: common.mustCall(function() {
1212
this.push(buf);
13-
}, 31)
13+
}, 13)
1414
});
1515

1616
let i = 0;
1717

1818
readable.on('readable', common.mustCall(function() {
19-
if (i++ === 10) {
19+
if (i++ > 10) {
2020
// We will just terminate now.
2121
process.removeAllListeners('readable');
2222
return;
2323
}
2424

25-
const data = readable.read();
26-
// TODO(mcollina): there is something odd in the highWaterMark logic
27-
// investigate.
28-
if (i === 1) {
29-
assert.strictEqual(data.length, 8192 * 2);
30-
} else {
31-
assert.strictEqual(data.length, 8192 * 3);
32-
}
33-
}, 11));
25+
assert.strictEqual(readable.read().length, 8192);
26+
}, 13));

test/parallel/test-stream-readable-needReadable.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const asyncReadable = new Readable({
3232
});
3333

3434
asyncReadable.on('readable', common.mustCall(() => {
35-
if (asyncReadable.read() !== null) {
35+
if (asyncReadable.read(asyncReadable.readableLength) !== null) {
3636
// After each read(), the buffer is empty.
3737
// If the stream doesn't end now,
3838
// then we need to notify the reader on future changes.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const { Readable } = require('stream');
6+
7+
// Read one buffer at a time and don't waste cycles allocating
8+
// and copying into a new larger buffer.
9+
{
10+
const r = new Readable({
11+
read() {}
12+
});
13+
const buffers = [Buffer.allocUnsafe(5), Buffer.allocUnsafe(10)];
14+
for (const buf of buffers) {
15+
r.push(buf);
16+
}
17+
for (const buf of buffers) {
18+
assert.strictEqual(r.read(), buf);
19+
}
20+
}

test/parallel/test-stream-typedarray.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,19 @@ const views = common.getArrayBufferViews(buffer);
8383
readable.push(views[2]);
8484
readable.unshift(views[0]);
8585

86-
const buf = readable.read();
86+
let buf;
87+
88+
buf = readable.read();
89+
assert(buf instanceof Buffer);
90+
assert.deepStrictEqual([...buf], [...views[0]]);
91+
92+
buf = readable.read();
93+
assert(buf instanceof Buffer);
94+
assert.deepStrictEqual([...buf], [...views[1]]);
95+
96+
buf = readable.read();
8797
assert(buf instanceof Buffer);
88-
assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]);
98+
assert.deepStrictEqual([...buf], [...views[2]]);
8999
}
90100

91101
{

0 commit comments

Comments
 (0)