Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 427defd

Browse files
committed
Aff cancellation works properly.
1 parent bca84f0 commit 427defd

File tree

3 files changed

+75
-92
lines changed

3 files changed

+75
-92
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11

2+
# v2.0.0
3+
4+
* Aff cancellation is correctly handled.
5+
26
# v1.1.0
37

48
* `write` will throw errors after `drain` event.

src/Node/Stream/Aff.purs

Lines changed: 62 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
-- | Asynchronous I/O with [*Node.js* Stream](https://nodejs.org/docs/latest/api/stream.html).
1+
-- | Asynchronous I/O with the [*Node.js* Stream API](https://nodejs.org/docs/latest/api/stream.html).
22
-- |
3-
-- | Open file streams with
3+
-- | Open __file streams__ with
44
-- | [__Node.FS.Stream__](https://pursuit.purescript.org/packages/purescript-node-fs/docs/Node.FS.Stream).
55
-- |
6-
-- | Open process streams with
6+
-- | Open __process streams__ with
77
-- | [__Node.Process__](https://pursuit.purescript.org/packages/purescript-node-process/docs/Node.Process).
88
-- |
9-
-- | Read and write strings with the __toString__ and __fromString__ functions in
9+
-- | Read and write __`String`__s with the `toString` and `fromString` functions in
1010
-- | [__Node.Buffer__](https://pursuit.purescript.org/packages/purescript-node-buffer/docs/Node.Buffer#t:MutableBuffer).
1111
-- |
12-
-- | All I/O errors will be thrown through the `Aff` `MonadError` class
12+
-- | All __I/O errors__ will be thrown through the `Aff` `MonadError` class
1313
-- | instance.
1414
-- |
15+
-- | `Aff` __cancellation__ will clean up all *Node.js* event listeners.
16+
-- |
17+
-- | All of these `Aff` functions will prevent the *Node.js* __event loop__ from
18+
-- | exiting until the `Aff` function completes.
19+
-- |
1520
-- | ## Reading
1621
-- |
1722
-- | #### Implementation
@@ -56,14 +61,7 @@
5661
-- | Reading from an ended, closed, errored, or destroyed stream
5762
-- | will complete immediately with `Tuple [] false`.
5863
-- |
59-
-- | #### Canceller argument
60-
-- |
61-
-- | The reading functions suffixed with underscore take a canceller argument.
62-
-- |
63-
-- | The canceller argument is an action to perform in the event that
64-
-- | this `Aff` is cancelled. For example, to destroy the stream
65-
-- | in the event that the `Aff` is cancelled pass `Node.Stream.destroy`
66-
-- | as the canceller.
64+
-- | The `readagain` flag will give the same answer as a call to `Internal.readable`.
6765
-- |
6866
-- | ## Writing
6967
-- |
@@ -78,37 +76,26 @@
7876
-- |
7977
-- | The writing functions will complete after all the data is flushed to the
8078
-- | stream.
81-
-- |
82-
-- | #### Canceller argument
83-
-- |
84-
-- | The writing functions suffixed with underscore take a canceller argument.
85-
-- |
86-
-- | The canceller argument is an action to perform in the event that
87-
-- | this `Aff` is cancelled.
8879
module Node.Stream.Aff
8980
( readSome
90-
, readSome_
9181
, readAll
92-
, readAll_
9382
, readN
94-
, readN_
9583
, write
96-
, write_
9784
, writableClose
9885
)
9986
where
10087

10188
import Prelude
10289

10390
import Control.Monad.ST.Class (liftST)
104-
import Control.Monad.ST.Ref as STRef
91+
import Control.Monad.ST.Ref as ST.Ref
10592
import Data.Array as Array
10693
import Data.Array.ST as Array.ST
10794
import Data.Either (Either(..))
10895
import Data.Maybe (Maybe(..))
10996
import Data.Tuple (Tuple(..))
11097
import Effect (Effect, untilE)
111-
import Effect.Aff (effectCanceler, makeAff, nonCanceler)
98+
import Effect.Aff (effectCanceler, makeAff)
11299
import Effect.Aff.Class (class MonadAff, liftAff)
113100
import Effect.Exception (catchException)
114101
import Node.Buffer (Buffer)
@@ -128,16 +115,7 @@ readSome
128115
. MonadAff m
129116
=> Readable r
130117
-> m (Tuple (Array Buffer) Boolean)
131-
readSome r = readSome_ r (\_ -> pure unit)
132-
133-
-- | __readSome__ with a canceller argument.
134-
readSome_
135-
:: forall m r
136-
. MonadAff m
137-
=> Readable r
138-
-> (Readable r -> Effect Unit)
139-
-> m (Tuple (Array Buffer) Boolean)
140-
readSome_ r canceller = liftAff <<< makeAff $ \res -> do
118+
readSome r = liftAff <<< makeAff $ \res -> do
141119
bufs <- liftST $ Array.ST.new
142120

143121
removeError <- onceError r $ res <<< Left
@@ -158,12 +136,12 @@ readSome_ r canceller = liftAff <<< makeAff $ \res -> do
158136

159137

160138
ret1 <- liftST $ Array.ST.unsafeFreeze bufs
161-
if Array.length ret1 == 0 then do
139+
removeReadable <- if Array.length ret1 == 0 then do
162140
-- if we couldn't read anything right away, then wait until the stream is readable.
163141
-- “The 'readable' event will also be emitted once the end of the
164142
-- stream data has been reached but before the 'end' event is emitted.”
165143
-- We already checked the `readable` property so we don't have to check again.
166-
void $ onceReadable r do
144+
onceReadable r do
167145
catchException (res <<< Left) do
168146
untilE do
169147
Stream.read r Nothing >>= case _ of
@@ -181,27 +159,26 @@ readSome_ r canceller = liftAff <<< makeAff $ \res -> do
181159
removeError
182160
readagain <- readable r
183161
res (Right (Tuple ret1 readagain))
162+
pure (pure unit) -- dummy canceller
184163

185-
pure $ effectCanceler (canceller r)
164+
-- canceller might by called while waiting for `onceReadable`
165+
pure $ effectCanceler do
166+
removeError
167+
removeReadable
186168

187169

188-
-- | Read all data until the end of the stream. Note that __stdin__ will never end.
189-
readAll
190-
:: forall m r
191-
. MonadAff m
192-
=> Readable r
193-
-> m (Tuple (Array Buffer) Boolean)
194-
readAll r = readAll_ r (\_ -> pure unit)
195170

196-
-- | __readAll__ with a canceller argument.
197-
readAll_
171+
-- | Read all data until the end of the stream.
172+
-- |
173+
-- | Note that __stdin__ will never end.
174+
readAll
198175
:: forall m r
199176
. MonadAff m
200177
=> Readable r
201-
-> (Readable r -> Effect Unit)
202178
-> m (Tuple (Array Buffer) Boolean)
203-
readAll_ r canceller = liftAff <<< makeAff $ \res -> do
179+
readAll r = liftAff <<< makeAff $ \res -> do
204180
bufs <- liftST $ Array.ST.new
181+
removeReadable <- liftST $ ST.Ref.new (pure unit :: Effect Unit)
205182

206183
removeError <- onceError r $ res <<< Left
207184

@@ -214,6 +191,7 @@ readAll_ r canceller = liftAff <<< makeAff $ \res -> do
214191
cleanupRethrow err = do
215192
removeError
216193
removeEnd
194+
join $ liftST $ ST.Ref.read removeReadable
217195
res (Left err)
218196

219197
-- try to read right away.
@@ -234,7 +212,7 @@ readAll_ r canceller = liftAff <<< makeAff $ \res -> do
234212
-- then wait for the stream to be readable until the stream has ended.
235213
let
236214
waitToRead = do
237-
void $ onceReadable r do
215+
removeReadable' <- onceReadable r do
238216
-- “The 'readable' event will also be emitted once the end of the
239217
-- stream data has been reached but before the 'end' event is emitted.”
240218
catchException cleanupRethrow do
@@ -245,9 +223,15 @@ readAll_ r canceller = liftAff <<< makeAff $ \res -> do
245223
_ <- liftST $ Array.ST.push chunk bufs
246224
pure false
247225
waitToRead -- this is not recursion
226+
void $ liftST $ ST.Ref.write removeReadable' removeReadable
248227

249228
waitToRead
250-
pure $ effectCanceler (canceller r)
229+
230+
-- canceller might by called while waiting for `onceReadable`
231+
pure $ effectCanceler do
232+
removeError
233+
removeEnd
234+
join $ liftST $ ST.Ref.read removeReadable
251235

252236

253237
-- | Wait for *N* bytes to become available from the stream.
@@ -263,19 +247,10 @@ readN
263247
=> Readable r
264248
-> Int
265249
-> m (Tuple (Array Buffer) Boolean)
266-
readN r n = readN_ r (\_ -> pure unit) n
267-
268-
-- | __readN__ with a canceller argument.
269-
readN_
270-
:: forall m r
271-
. MonadAff m
272-
=> Readable r
273-
-> (Readable r -> Effect Unit)
274-
-> Int
275-
-> m (Tuple (Array Buffer) Boolean)
276-
readN_ r canceller n = liftAff <<< makeAff $ \res -> do
277-
redRef <- liftST $ STRef.new 0
250+
readN r n = liftAff <<< makeAff $ \res -> do
251+
redRef <- liftST $ ST.Ref.new 0
278252
bufs <- liftST $ Array.ST.new
253+
removeReadable <- liftST $ ST.Ref.new (pure unit :: Effect Unit)
279254

280255
-- TODO on error, we're not calling removeEnd...
281256
removeError <- onceError r $ res <<< Left
@@ -284,20 +259,22 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
284259
-- if there are more bytes in the stream?
285260
removeEnd <- onceEnd r do
286261
removeError
262+
-- join $ liftST $ ST.Ref.read removeReadable
287263
ret <- liftST $ Array.ST.unsafeFreeze bufs
288264
res (Right (Tuple ret false))
289265

290266
let
291267
cleanupRethrow err = do
292268
removeError
293269
removeEnd
270+
join $ liftST $ ST.Ref.read removeReadable
294271
res (Left err)
295272

296273
-- try to read N bytes and then either return N bytes or run a continuation
297274
tryToRead continuation = do
298275
catchException cleanupRethrow do
299276
untilE do
300-
red <- liftST $ STRef.read redRef
277+
red <- liftST $ ST.Ref.read redRef
301278
-- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size
302279
-- “If size bytes are not available to be read, null will be returned
303280
-- unless the stream has ended, in which case all of the data remaining
@@ -307,12 +284,12 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
307284
Just chunk -> do
308285
_ <- liftST $ Array.ST.push chunk bufs
309286
s <- Buffer.size chunk
310-
red' <- liftST $ STRef.modify (_+s) redRef
287+
red' <- liftST $ ST.Ref.modify (_+s) redRef
311288
if red' >= n then
312289
pure true
313290
else
314291
pure false
315-
red <- liftST $ STRef.read redRef
292+
red <- liftST $ ST.Ref.read redRef
316293
if red >= n then do
317294
removeError
318295
removeEnd
@@ -334,11 +311,16 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
334311
-- if there were not enough bytes right away, then wait for bytes to come in.
335312
let
336313
waitToRead _ = do
337-
void $ onceReadable r do
314+
removeReadable' <- onceReadable r do
338315
tryToRead waitToRead
316+
void $ liftST $ ST.Ref.write removeReadable' removeReadable
339317
waitToRead unit
340318

341-
pure $ effectCanceler (canceller r)
319+
-- canceller might by called while waiting for `onceReadable`
320+
pure $ effectCanceler do
321+
removeError
322+
removeEnd
323+
join $ liftST $ ST.Ref.read removeReadable
342324

343325

344326
-- | Write to a stream.
@@ -350,18 +332,9 @@ write
350332
=> Writable w
351333
-> Array Buffer
352334
-> m Unit
353-
write w bs = write_ w (\_ -> pure unit) bs
354-
355-
-- | __write__ with a canceller argument.
356-
write_
357-
:: forall m w
358-
. MonadAff m
359-
=> Writable w
360-
-> (Writable w -> Effect Unit)
361-
-> Array Buffer
362-
-> m Unit
363-
write_ w canceller bs = liftAff <<< makeAff $ \res -> do
335+
write w bs = liftAff <<< makeAff $ \res -> do
364336
bufs <- liftST $ Array.ST.thaw bs
337+
removeDrain <- liftST $ ST.Ref.new (pure unit :: Effect Unit)
365338

366339
removeError <- onceError w $ res <<< Left
367340

@@ -391,11 +364,16 @@ write_ w canceller bs = liftAff <<< makeAff $ \res -> do
391364
if nobackpressure then do
392365
pure false
393366
else do
394-
_ <- onceDrain w oneWrite
367+
removeDrain' <- onceDrain w oneWrite
368+
void $ liftST $ ST.Ref.write removeDrain' removeDrain
395369
pure true
396370

397371
oneWrite
398-
pure $ effectCanceler (canceller w)
372+
373+
-- canceller might be called while waiting for `onceDrain`
374+
pure $ effectCanceler do
375+
removeError
376+
join $ liftST $ ST.Ref.read removeDrain
399377

400378
-- | Close a `Writable` file stream.
401379
-- |
@@ -413,4 +391,5 @@ writableClose w = liftAff <<< makeAff $ \res -> do
413391
removeError
414392
res (Right unit)
415393

416-
pure nonCanceler
394+
pure $ effectCanceler do
395+
removeError

src/Node/Stream/Internal.purs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ import Node.Stream (Readable, Stream, Writable)
2020
-- | Listen for one `readable` event, call the callback, then remove
2121
-- | the event listener.
2222
-- |
23-
-- | Returns an effect for removing the event listener even if no event
24-
-- | was raised.
23+
-- | Returns an effect for removing the event listener before the event
24+
-- | is raised.
2525
foreign import onceReadable
2626
:: forall r
2727
. Readable r
@@ -31,8 +31,8 @@ foreign import onceReadable
3131
-- | Listen for one `end` event, call the callback, then remove
3232
-- | the event listener.
3333
-- |
34-
-- | Returns an effect for removing the event listener even if no event
35-
-- | was raised.
34+
-- | Returns an effect for removing the event listener before the event
35+
-- | is raised.
3636
foreign import onceEnd
3737
:: forall r
3838
. Readable r
@@ -42,8 +42,8 @@ foreign import onceEnd
4242
-- | Listen for one `drain` event, call the callback, then remove
4343
-- | the event listener.
4444
-- |
45-
-- | Returns an effect for removing the event listener even if no event
46-
-- | was raised.
45+
-- | Returns an effect for removing the event listener before the event
46+
-- | is raised.
4747
foreign import onceDrain
4848
:: forall w
4949
. Writable w
@@ -53,8 +53,8 @@ foreign import onceDrain
5353
-- | Listen for one `error` event, call the callback, then remove
5454
-- | the event listener.
5555
-- |
56-
-- | Returns an effect for removing the event listener even if no event
57-
-- | was raised.
56+
-- | Returns an effect for removing the event listener before the event
57+
-- | is raised.
5858
foreign import onceError
5959
:: forall r
6060
. Stream r
@@ -74,7 +74,7 @@ foreign import readable
7474
-- | The [`writeStream.close([callback])`](https://nodejs.org/api/fs.html#writestreamclosecallback)
7575
-- | function.
7676
-- |
77-
-- | Accepts a callback that will be executed once the writeStream is closed.
77+
-- | Accepts a callback that will be executed when the writeStream has closed.
7878
foreign import writeStreamClose
7979
:: forall w
8080
. Writable w

0 commit comments

Comments
 (0)