Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit a01691f

Browse files
committed
Readers will not hang on eof, will return eof flag.
1 parent 9efb49f commit a01691f

File tree

6 files changed

+92
-46
lines changed

6 files changed

+92
-46
lines changed

spago.dhall

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ to generate this file without the comments in this block.
2222
, "either"
2323
, "maybe"
2424
, "prelude"
25+
, "tuples"
2526
]
2627
, packages = ./packages.dhall
2728
, sources = [ "src/**/*.purs" ]

src/Node/Stream/Aff.purs

Lines changed: 61 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
-- | Open process streams with
77
-- | [__Node.Process__](https://pursuit.purescript.org/packages/purescript-node-process/docs/Node.Process).
88
-- |
9+
-- | All I/O errors will be thrown through the `Aff` `MonadError` class
10+
-- | instance.
11+
-- |
912
-- | ## Reading
1013
-- |
1114
-- | #### Implementation
@@ -21,22 +24,29 @@
2124
-- | #### Results
2225
-- |
2326
-- | The result of a reading function may be chunked into more than one `Buffer`.
27+
-- | The `fst` element of the result `Tuple` is an `Array Buffer` of what
28+
-- | was read.
2429
-- | To concatenate the result into a single `Buffer`, use
2530
-- | `Node.Buffer.concat :: Array Buffer -> Buffer`.
2631
-- |
2732
-- | ```
28-
-- | input :: Buffer <- liftEffect <<< concat =<< readSome stdin
33+
-- | input :: Buffer <- liftEffect <<< concat <<< fst =<< readSome stdin
2934
-- | ```
3035
-- |
3136
-- | To calculate the number of bytes read, use
3237
-- | `Node.Buffer.size :: Buffer -> m Int`.
3338
-- |
3439
-- | ```
35-
-- | inputs :: Array Buffer <- readSome stdin
40+
-- | Tuple inputs _ :: Array Buffer <- readSome stdin
3641
-- | bytesRead :: Int
3742
-- | <- liftEffect $ Array.foldM (\a b -> (a+_) <$> size b) 0 inputs
3843
-- | ```
3944
-- |
45+
-- | The `snd` element of the result `Tuple` is a `Boolean` flag which
46+
-- | is `true` if the stream has not reached End-Of-File (and also if the stream
47+
-- | has not errored or been destroyed.) If the flag is `false` then
48+
-- | no more bytes will ever be produced by the stream.
49+
-- |
4050
-- | #### Canceller argument
4151
-- |
4252
-- | The reading functions suffixed with underscore take a canceller argument.
@@ -46,12 +56,6 @@
4656
-- | in the event that the `Aff` is cancelled pass `Node.Stream.destroy`
4757
-- | as the canceller.
4858
-- |
49-
-- | #### EOF
50-
-- |
51-
-- | There doesn’t seem to be any way to reliably detect when a stream has reached
52-
-- | its end? If any one of these reading functions is called on a stream
53-
-- | which has already reached its end, then the reading function will never complete.
54-
-- |
5559
-- | ## Writing
5660
-- |
5761
-- | #### Implementation
@@ -63,7 +67,7 @@
6367
-- | function on each of the `Buffer`s,
6468
-- | asychronously waiting if there is backpressure from the stream.
6569
-- |
66-
-- | The writing functions will complete after the data is flushed to the
70+
-- | The writing functions will complete after all the data is flushed to the
6771
-- | stream.
6872
-- |
6973
-- | #### Canceller argument
@@ -92,6 +96,7 @@ import Data.Array as Array
9296
import Data.Array.ST as Array.ST
9397
import Data.Either (Either(..))
9498
import Data.Maybe (Maybe(..))
99+
import Data.Tuple (Tuple(..))
95100
import Effect (Effect, untilE)
96101
import Effect.Aff (effectCanceler, makeAff)
97102
import Effect.Aff.Class (class MonadAff, liftAff)
@@ -100,25 +105,19 @@ import Node.Buffer (Buffer)
100105
import Node.Buffer as Buffer
101106
import Node.Stream (Readable, Writable)
102107
import Node.Stream as Stream
103-
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable)
108+
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable, readable)
104109

105110

106111
-- | Wait until there is some data available from the stream, then read it.
107112
-- |
108113
-- | This function is useful for streams like __stdin__ which never
109114
-- | reach End-Of-File.
110115
-- |
111-
-- | There is no way (?) to reliably detect with *Node.js*
112-
-- | when a stream has already reached its end, and if this
113-
-- | function is called after the stream has ended then the call will
114-
-- | never complete. So we can `readSome` one time and it will complete, but
115-
-- | if the stream reached its end then the next call to `readSome`
116-
-- | will never complete.
117116
readSome
118117
:: forall m r
119118
. MonadAff m
120119
=> Readable r
121-
-> m (Array Buffer)
120+
-> m (Tuple (Array Buffer) Boolean)
122121
readSome r = readSome_ r (\_ -> pure unit)
123122

124123
-- | __readSome__ with a canceller argument.
@@ -127,26 +126,33 @@ readSome_
127126
. MonadAff m
128127
=> Readable r
129128
-> (Readable r -> Effect Unit)
130-
-> m (Array Buffer)
129+
-> m (Tuple (Array Buffer) Boolean)
131130
readSome_ r canceller = liftAff <<< makeAff $ \res -> do
132131
bufs <- liftST $ Array.ST.new
133132

134133
removeError <- onceError r $ res <<< Left
135134

136135
-- try to read right away.
137136
catchException (res <<< Left) do
138-
untilE do
139-
Stream.read r Nothing >>= case _ of
140-
Nothing -> pure true
141-
Just chunk -> do
142-
void $ liftST $ Array.ST.push chunk bufs
143-
pure false
137+
ifM (readable r)
138+
do
139+
untilE do
140+
Stream.read r Nothing >>= case _ of
141+
Nothing -> pure true
142+
Just chunk -> do
143+
void $ liftST $ Array.ST.push chunk bufs
144+
pure false
145+
do
146+
removeError
147+
res (Right (Tuple [] false))
148+
144149

145150
ret1 <- liftST $ Array.ST.unsafeFreeze bufs
146151
if Array.length ret1 == 0 then do
147152
-- if we couldn't read anything right away, then wait until the stream is readable.
148153
-- “The 'readable' event will also be emitted once the end of the
149154
-- stream data has been reached but before the 'end' event is emitted.”
155+
-- We already checked the `readable` property so we don't have to check again.
150156
void $ onceReadable r do
151157
catchException (res <<< Left) do
152158
untilE do
@@ -157,12 +163,14 @@ readSome_ r canceller = liftAff <<< makeAff $ \res -> do
157163
pure false
158164
ret2 <- liftST $ Array.ST.unsafeFreeze bufs
159165
removeError
160-
res (Right ret2)
166+
readagain <- readable r
167+
res (Right (Tuple ret2 readagain))
161168

162169
-- return what we read right away
163170
else do
164171
removeError
165-
res (Right ret1)
172+
readagain <- readable r
173+
res (Right (Tuple ret1 readagain))
166174

167175
pure $ effectCanceler (canceller r)
168176

@@ -172,7 +180,7 @@ readAll
172180
:: forall m r
173181
. MonadAff m
174182
=> Readable r
175-
-> m (Array Buffer)
183+
-> m (Tuple (Array Buffer) Boolean)
176184
readAll r = readAll_ r (\_ -> pure unit)
177185

178186
-- | __readAll__ with a canceller argument.
@@ -181,7 +189,7 @@ readAll_
181189
. MonadAff m
182190
=> Readable r
183191
-> (Readable r -> Effect Unit)
184-
-> m (Array Buffer)
192+
-> m (Tuple (Array Buffer) Boolean)
185193
readAll_ r canceller = liftAff <<< makeAff $ \res -> do
186194
bufs <- liftST $ Array.ST.new
187195

@@ -190,7 +198,7 @@ readAll_ r canceller = liftAff <<< makeAff $ \res -> do
190198
removeEnd <- onceEnd r do
191199
removeError
192200
ret <- liftST $ Array.ST.unsafeFreeze bufs
193-
res (Right ret)
201+
res (Right (Tuple ret false))
194202

195203
let
196204
cleanupRethrow err = do
@@ -200,12 +208,18 @@ readAll_ r canceller = liftAff <<< makeAff $ \res -> do
200208

201209
-- try to read right away.
202210
catchException cleanupRethrow do
203-
untilE do
204-
Stream.read r Nothing >>= case _ of
205-
Nothing -> pure true
206-
Just chunk -> do
207-
void $ liftST $ Array.ST.push chunk bufs
208-
pure false
211+
ifM (readable r)
212+
do
213+
untilE do
214+
Stream.read r Nothing >>= case _ of
215+
Nothing -> pure true
216+
Just chunk -> do
217+
void $ liftST $ Array.ST.push chunk bufs
218+
pure false
219+
do
220+
removeError
221+
removeEnd
222+
res (Right (Tuple [] false))
209223

210224
-- then wait for the stream to be readable until the stream has ended.
211225
let
@@ -238,7 +252,7 @@ readN
238252
. MonadAff m
239253
=> Readable r
240254
-> Int
241-
-> m (Array Buffer)
255+
-> m (Tuple (Array Buffer) Boolean)
242256
readN r n = readN_ r (\_ -> pure unit) n
243257

244258
-- | __readN__ with a canceller argument.
@@ -248,7 +262,7 @@ readN_
248262
=> Readable r
249263
-> (Readable r -> Effect Unit)
250264
-> Int
251-
-> m (Array Buffer)
265+
-> m (Tuple (Array Buffer) Boolean)
252266
readN_ r canceller n = liftAff <<< makeAff $ \res -> do
253267
redRef <- liftST $ STRef.new 0
254268
bufs <- liftST $ Array.ST.new
@@ -261,7 +275,7 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
261275
removeEnd <- onceEnd r do
262276
removeError
263277
ret <- liftST $ Array.ST.unsafeFreeze bufs
264-
res (Right ret)
278+
res (Right (Tuple ret false))
265279

266280
let
267281
cleanupRethrow err = do
@@ -293,12 +307,19 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
293307
removeError
294308
removeEnd
295309
ret <- liftST $ Array.ST.unsafeFreeze bufs
296-
res (Right ret)
310+
readagain <- readable r
311+
res (Right (Tuple ret readagain))
297312
else
298313
continuation unit
299314

300315
-- try to read right away.
301-
tryToRead (\_ -> pure unit)
316+
ifM (readable r)
317+
do
318+
tryToRead (\_ -> pure unit)
319+
do
320+
removeError
321+
removeEnd
322+
res (Right (Tuple [] false))
302323

303324
-- if there were not enough bytes right away, then wait for bytes to come in.
304325
let

src/Node/Stream/Internal.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,9 @@ export function onceError(s) {
2727
return () => {s.removeListener('error', f);};
2828
};
2929
}
30+
31+
export function readable(s) {
32+
return () => {
33+
return s.readable;
34+
}
35+
}

src/Node/Stream/Internal.purs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module Node.Stream.Aff.Internal
33
, onceEnd
44
, onceError
55
, onceReadable
6+
, readable
67
)
78
where
89

@@ -55,3 +56,13 @@ foreign import onceError
5556
. Stream r
5657
-> (Error -> Effect Unit)
5758
-> Effect (Effect Unit)
59+
60+
-- | The [`readable.readable`](https://nodejs.org/api/stream.html#readablereadable)
61+
-- | property of a stream.
62+
-- |
63+
-- | > Is true if it is safe to call `readable.read()`, which means the stream
64+
-- | > has not been destroyed or emitted 'error' or 'end'.
65+
foreign import readable
66+
:: forall r
67+
. Readable r
68+
-> Effect Boolean

test/Main.purs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import Prelude
1212
import Data.Array as Array
1313
import Data.Either (Either(..))
1414
import Data.Maybe (Maybe(..))
15+
import Data.Tuple (Tuple(..), fst)
1516
import Effect (Effect)
1617
import Effect.Aff (Error, Milliseconds(..), runAff_)
1718
import Effect.Class (liftEffect)
@@ -45,11 +46,16 @@ main = unsafePartial $ do
4546
outstring <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8
4647
write outfile $ Array.replicate magnitude outstring
4748
infile <- liftEffect $ createReadStream outfilename
48-
input1 <- readSome infile
49-
input2 <- readN infile (5 * magnitude)
50-
input3 <- readAll infile
49+
Tuple input1 _ <- readSome infile
50+
Tuple input2 _ <- readN infile (5 * magnitude)
51+
Tuple input3 readagain <- readAll infile
52+
shouldEqual readagain false
53+
_ :: Buffer <- liftEffect <<< concat <<< fst =<< readSome infile
54+
void $ readN infile 1
55+
void $ readAll infile
5156
let inputs = input1 <> input2 <> input3
5257
input :: Buffer <- liftEffect $ concat inputs
5358
inputSize <- liftEffect $ Buffer.size input
5459
shouldEqual (10 * magnitude) inputSize
60+
5561
pure (pure unit)

test/Main3.purs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import Prelude
99

1010
import Data.Array as Array
1111
import Data.Either (Either(..))
12+
import Data.Tuple (Tuple(..))
1213
import Effect (Effect)
1314
import Effect.Aff (Error, runAff_)
1415
import Effect.Class (liftEffect)
@@ -37,11 +38,11 @@ main = unsafePartial $ do
3738
describe "Node.Stream.Aff" do
3839
it "reads 1" do
3940
infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv
40-
inputs1 <- readN infile 500000
41+
Tuple inputs1 _ <- readN infile 500000
4142
bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs1
4243
shouldEqual 500000 bytesRead1
43-
inputs2 <- readSome infile
44-
inputs3 <- readAll infile
44+
Tuple inputs2 _ <- readSome infile
45+
Tuple inputs3 _ <- readAll infile
4546
let inputs = inputs1 <> inputs2 <> inputs3
4647
-- TODO read after EOF will hang
4748
-- inputs4 <- readAll infile

0 commit comments

Comments
 (0)