Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 403cd99

Browse files
committed
read before readable event, detach all listeners
1 parent 3655c41 commit 403cd99

File tree

4 files changed

+193
-70
lines changed

4 files changed

+193
-70
lines changed

src/Node/Stream/Aff.purs

Lines changed: 159 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,13 @@ import Data.Maybe (Maybe(..))
9898
import Effect (Effect, untilE)
9999
import Effect.Aff (effectCanceler, makeAff)
100100
import Effect.Aff.Class (class MonadAff, liftAff)
101-
import Effect.Console as Console
102101
import Effect.Exception (catchException)
103102
import Node.Buffer (Buffer)
104103
import Node.Buffer as Buffer
105104
import Node.Encoding (Encoding(..))
106105
import Node.Stream (Readable, Writable)
107106
import Node.Stream as Stream
108-
import Node.Stream.Aff.Internal (clearInterval, hasRef, setInterval)
107+
import Node.Stream.Aff.Internal (clearInterval, hasRef, onceReadable, setInterval)
109108
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable)
110109
import Node.Stream.Aff.Internal (unbuffer) as Reexport
111110

@@ -133,22 +132,40 @@ readSome_
133132
-> (Readable r -> Effect Unit)
134133
-> m (Array Buffer)
135134
readSome_ r canceller = liftAff <<< makeAff $ \res -> do
135+
bufs <- liftST $ Array.ST.new
136+
137+
removeError <- onceError r $ res <<< Left
138+
139+
-- try to read right away.
140+
catchException (res <<< Left) do
141+
untilE do
142+
Stream.read r Nothing >>= case _ of
143+
Nothing -> pure true
144+
Just chunk -> do
145+
void $ liftST $ Array.ST.push chunk bufs
146+
pure false
147+
148+
ret1 <- liftST $ Array.ST.unsafeFreeze bufs
149+
if Array.length ret1 == 0 then do
150+
-- if we couldn't read anything right away, then wait until the stream is readable.
151+
-- “The 'readable' event will also be emitted once the end of the
152+
-- stream data has been reached but before the 'end' event is emitted.”
153+
void $ onceReadable r do
154+
catchException (res <<< Left) do
155+
untilE do
156+
Stream.read r Nothing >>= case _ of
157+
Nothing -> pure true
158+
Just chunk -> do
159+
void $ liftST $ Array.ST.push chunk bufs
160+
pure false
161+
ret2 <- liftST $ Array.ST.unsafeFreeze bufs
162+
removeError
163+
res (Right ret2)
136164

137-
onceError r $ res <<< Left
138-
139-
onceReadable r do
140-
catchException (res <<< Left) do
141-
bufs <- liftST $ Array.ST.new
142-
untilE do
143-
Stream.read r Nothing >>= case _ of
144-
-- “The 'readable' event will also be emitted once the end of the
145-
-- stream data has been reached but before the 'end' event is emitted.”
146-
Nothing -> pure true
147-
Just chunk -> do
148-
void $ liftST $ Array.ST.push chunk bufs
149-
pure false
150-
ret <- liftST $ Array.ST.unsafeFreeze bufs
151-
res $ Right ret
165+
-- return what we read right away
166+
else do
167+
removeError
168+
res (Right ret1)
152169

153170
pure $ effectCanceler (canceller r)
154171

@@ -172,27 +189,44 @@ readAll_
172189
readAll_ r canceller = liftAff <<< makeAff $ \res -> do
173190
bufs <- liftST $ Array.ST.new
174191

175-
onceError r $ res <<< Left
192+
removeError <- onceError r $ res <<< Left
176193

177-
onceEnd r do
194+
removeEnd <- onceEnd r do
195+
removeError
178196
ret <- liftST $ Array.ST.unsafeFreeze bufs
179-
res $ Right ret
197+
res (Right ret)
180198

181199
let
182-
oneRead = do
183-
onceReadable r do
200+
cleanupRethrow err = do
201+
removeError
202+
removeEnd
203+
res (Left err)
204+
205+
-- try to read right away.
206+
catchException cleanupRethrow do
207+
untilE do
208+
Stream.read r Nothing >>= case _ of
209+
Nothing -> pure true
210+
Just chunk -> do
211+
void $ liftST $ Array.ST.push chunk bufs
212+
pure false
213+
214+
-- then wait for the stream to be readable until the stream has ended.
215+
let
216+
waitToRead = do
217+
void $ onceReadable r do
184218
-- “The 'readable' event will also be emitted once the end of the
185219
-- stream data has been reached but before the 'end' event is emitted.”
186-
catchException (res <<< Left) do
220+
catchException cleanupRethrow do
187221
untilE do
188222
Stream.read r Nothing >>= case _ of
189223
Nothing -> pure true
190224
Just chunk -> do
191225
_ <- liftST $ Array.ST.push chunk bufs
192226
pure false
193-
oneRead -- this is not recursion
227+
waitToRead -- this is not recursion
194228

195-
oneRead
229+
waitToRead
196230
pure $ effectCanceler (canceller r)
197231

198232

@@ -223,42 +257,97 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
223257
redRef <- liftST $ STRef.new 0
224258
bufs <- liftST $ Array.ST.new
225259

226-
onceError r $ res <<< Left
260+
-- TODO on error, we're not calling removeEnd...
261+
removeError <- onceError r $ res <<< Left
227262

228263
-- The `end` event is sometimes raised after we have read N bytes, even
229264
-- if there are more bytes in the stream?
230-
onceEnd r do
265+
removeEnd <- onceEnd r do
266+
removeError
231267
ret <- liftST $ Array.ST.unsafeFreeze bufs
232-
res $ Right ret
268+
res (Right ret)
233269

234270
let
235-
oneRead = do
236-
onceReadable r do
237-
catchException (res <<< Left) do
238-
untilE do
239-
red <- liftST $ STRef.read redRef
240-
-- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size
241-
-- “If size bytes are not available to be read, null will be returned
242-
-- unless the stream has ended, in which case all of the data remaining
243-
-- in the internal buffer will be returned.”
244-
Stream.read r (Just (n-red)) >>= case _ of
245-
Nothing -> pure true
246-
Just chunk -> do
247-
_ <- liftST $ Array.ST.push chunk bufs
248-
s <- Buffer.size chunk
249-
red' <- liftST $ STRef.modify (_+s) redRef
250-
if red' >= n then
251-
pure true
252-
else
253-
pure false
271+
cleanupRethrow err = do
272+
removeError
273+
removeEnd
274+
res (Left err)
275+
276+
-- try to read N bytes and then either return N bytes or run a continuation
277+
tryToRead continuation = do
278+
catchException cleanupRethrow do
279+
untilE do
254280
red <- liftST $ STRef.read redRef
255-
if red >= n then do
256-
ret <- liftST $ Array.ST.unsafeFreeze bufs
257-
res $ Right ret
258-
else
259-
oneRead -- this is not recursion
260-
261-
oneRead
281+
-- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size
282+
-- “If size bytes are not available to be read, null will be returned
283+
-- unless the stream has ended, in which case all of the data remaining
284+
-- in the internal buffer will be returned.”
285+
Stream.read r (Just (n-red)) >>= case _ of
286+
Nothing -> pure true
287+
Just chunk -> do
288+
_ <- liftST $ Array.ST.push chunk bufs
289+
s <- Buffer.size chunk
290+
red' <- liftST $ STRef.modify (_+s) redRef
291+
if red' >= n then
292+
pure true
293+
else
294+
pure false
295+
red <- liftST $ STRef.read redRef
296+
if red >= n then do
297+
removeError
298+
removeEnd
299+
ret <- liftST $ Array.ST.unsafeFreeze bufs
300+
res (Right ret)
301+
else
302+
continuation unit
303+
304+
-- try to read right away.
305+
tryToRead (\_ -> pure unit)
306+
307+
-- if there were not enough bytes right away, then wait for bytes to come in.
308+
let
309+
waitToRead _ = do
310+
void $ onceReadable r do
311+
tryToRead waitToRead
312+
waitToRead unit
313+
314+
-- fix \waitToRead -> do
315+
-- void $ onceReadable r do
316+
-- tryToRead waitToRead
317+
-- let
318+
-- waitToRead = do
319+
-- void $ onceReadable r do
320+
-- tryToRead waitToRead -- this is not recursion
321+
322+
323+
-- let
324+
-- oneRead = do
325+
-- void $ onceReadable r do
326+
-- catchException cleanupRethrow do
327+
-- untilE do
328+
-- red <- liftST $ STRef.read redRef
329+
-- -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size
330+
-- -- “If size bytes are not available to be read, null will be returned
331+
-- -- unless the stream has ended, in which case all of the data remaining
332+
-- -- in the internal buffer will be returned.”
333+
-- Stream.read r (Just (n-red)) >>= case _ of
334+
-- Nothing -> pure true
335+
-- Just chunk -> do
336+
-- _ <- liftST $ Array.ST.push chunk bufs
337+
-- s <- Buffer.size chunk
338+
-- red' <- liftST $ STRef.modify (_+s) redRef
339+
-- if red' >= n then
340+
-- pure true
341+
-- else
342+
-- pure false
343+
-- red <- liftST $ STRef.read redRef
344+
-- if red >= n then do
345+
-- ret <- liftST $ Array.ST.unsafeFreeze bufs
346+
-- res $ Right ret
347+
-- else
348+
-- oneRead -- this is not recursion
349+
350+
-- waitToRead
262351
pure $ effectCanceler (canceller r)
263352

264353

@@ -282,7 +371,7 @@ write_
282371
write_ w canceller bs = liftAff <<< makeAff $ \res -> do
283372
bufs <- liftST $ Array.ST.thaw bs
284373

285-
onceError w $ res <<< Left
374+
removeError <- onceError w $ res <<< Left
286375

287376
let
288377
callback = case _ of
@@ -306,12 +395,26 @@ write_ w canceller bs = liftAff <<< makeAff $ \res -> do
306395
if nobackpressure then do
307396
pure false
308397
else do
309-
onceDrain w oneWrite
398+
_ <- onceDrain w oneWrite
310399
pure true
311400

312401
oneWrite
402+
removeError
313403
pure $ effectCanceler (canceller w)
314404

405+
406+
-- TODO Remove all listeners before returning
407+
-- https://nodejs.org/api/events.html#emitterremovelistenereventname-listener
408+
--
409+
-- (node:483527) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added to [Socket]. Use emitter.setMaxListeners() to increase limit
410+
-- (Use `node --trace-warnings ...` to show where the warning was created)
411+
--
412+
-- Buffer.size 28
413+
--
414+
-- (node:483527) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added to [Socket]. Use emitter.setMaxListeners() to increase limit
415+
416+
417+
315418
-- | https://github.com/purescript-contrib/pulp/blob/79dd954c86a5adc57051cad127c8888756f680a6/src/Pulp/System/Stream.purs#L41
316419
write' :: forall m w. MonadAff m => Writable w -> String -> m Unit
317420
write' stream str = liftAff $ makeAff (\cb -> mempty <* void (Stream.writeString stream UTF8 str (\_ -> cb (Right unit))))

src/Node/Stream/Internal.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,33 @@ import timers from 'node:timers';
33
export function onceReadable(s) {
44
return f => () => {
55
s.once('readable', f);
6+
return () => {s.removeListener('readable', f);};
67
};
78
}
89

910
export function onceEnd(s) {
1011
return f => () => {
1112
s.once('end', f);
13+
return () => {s.removeListener('end', f);};
1214
};
1315
}
1416

1517
export function onceDrain(s) {
1618
return f => () => {
1719
s.once('drain', f);
20+
return () => {s.removeListener('drain', f);};
1821
};
1922
}
2023

2124
export function onceError(s) {
2225
return f => () => {
2326
s.once('error', f);
27+
return () => {s.removeListener('error', f);};
2428
};
2529
}
2630

31+
32+
2733
export function unbuffer(s) {
2834
// https://github.com/nodejs/node/issues/6456
2935
// https://github.com/nodejs/node/issues/6379#issuecomment-1064044886

src/Node/Stream/Internal.purs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,49 @@ import Effect (Effect)
1717
import Effect.Exception (Error)
1818
import Node.Stream (Readable, Stream, Writable)
1919

20-
-- | Listen for one `readable` event, call the callback, then detach
21-
-- | the `readable` event listener.
20+
-- | Listen for one `readable` event, call the callback, then remove
21+
-- | the event listener.
22+
-- |
23+
-- | Returns an effect for removing the event listener even if no event
24+
-- | was raised.
2225
foreign import onceReadable
2326
:: forall r
2427
. Readable r
2528
-> Effect Unit
26-
-> Effect Unit
29+
-> Effect (Effect Unit)
2730

28-
-- | Listen for one `end` event, call the callback, then detach
29-
-- | the `end` event listener.
31+
-- | Listen for one `end` event, call the callback, then remove
32+
-- | the event listener.
33+
-- |
34+
-- | Returns an effect for removing the event listener even if no event
35+
-- | was raised.
3036
foreign import onceEnd
3137
:: forall r
3238
. Readable r
3339
-> Effect Unit
34-
-> Effect Unit
40+
-> Effect (Effect Unit)
3541

36-
-- | Listen for one `drain` event, call the callback, then detach
37-
-- | the `drain` event listener.
42+
-- | Listen for one `drain` event, call the callback, then remove
43+
-- | the event listener.
44+
-- |
45+
-- | Returns an effect for removing the event listener even if no event
46+
-- | was raised.
3847
foreign import onceDrain
3948
:: forall w
4049
. Writable w
4150
-> Effect Unit
42-
-> Effect Unit
51+
-> Effect (Effect Unit)
4352

53+
-- | Listen for one `error` event, call the callback, then remove
54+
-- | the event listener.
55+
-- |
56+
-- | Returns an effect for removing the event listener even if no event
57+
-- | was raised.
4458
foreign import onceError
4559
:: forall r
4660
. Stream r
4761
-> (Error -> Effect Unit)
48-
-> Effect Unit
62+
-> Effect (Effect Unit)
4963

5064
-- | Issue:
5165
-- | https://github.com/nodejs/node/issues/6379

0 commit comments

Comments
 (0)