Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit d73f26a

Browse files
committed
writeStreamClose
1 parent a01691f commit d73f26a

File tree

4 files changed

+68
-36
lines changed

4 files changed

+68
-36
lines changed

src/Node/Stream/Aff.purs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
-- | Internally the writing functions will call the
6666
-- | [`writable.write(chunk[, encoding][, callback])`](https://nodejs.org/docs/latest/api/stream.html#writablewritechunk-encoding-callback)
6767
-- | function on each of the `Buffer`s,
68-
-- | asychronously waiting if there is backpressure from the stream.
68+
-- | and will asychronously wait if there is backpressure from the stream.
6969
-- |
7070
-- | The writing functions will complete after all the data is flushed to the
7171
-- | stream.
@@ -77,14 +77,15 @@
7777
-- | The canceller argument is an action to perform in the event that
7878
-- | this `Aff` is cancelled.
7979
module Node.Stream.Aff
80-
( readAll
80+
( readSome
81+
, readSome_
82+
, readAll
8183
, readAll_
8284
, readN
8385
, readN_
84-
, readSome
85-
, readSome_
8686
, write
8787
, write_
88+
, writableClose
8889
)
8990
where
9091

@@ -98,14 +99,14 @@ import Data.Either (Either(..))
9899
import Data.Maybe (Maybe(..))
99100
import Data.Tuple (Tuple(..))
100101
import Effect (Effect, untilE)
101-
import Effect.Aff (effectCanceler, makeAff)
102+
import Effect.Aff (effectCanceler, makeAff, nonCanceler)
102103
import Effect.Aff.Class (class MonadAff, liftAff)
103104
import Effect.Exception (catchException)
104105
import Node.Buffer (Buffer)
105106
import Node.Buffer as Buffer
106107
import Node.Stream (Readable, Writable)
107108
import Node.Stream as Stream
108-
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable, readable)
109+
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable, readable, writeStreamClose)
109110

110111

111112
-- | Wait until there is some data available from the stream, then read it.
@@ -175,7 +176,7 @@ readSome_ r canceller = liftAff <<< makeAff $ \res -> do
175176
pure $ effectCanceler (canceller r)
176177

177178

178-
-- | Read all data until the end of the stream. Note that `stdin` will never end.
179+
-- | Read all data until the end of the stream. Note that __stdin__ will never end.
179180
readAll
180181
:: forall m r
181182
. MonadAff m
@@ -381,3 +382,21 @@ write_ w canceller bs = liftAff <<< makeAff $ \res -> do
381382
oneWrite
382383
removeError
383384
pure $ effectCanceler (canceller w)
385+
386+
-- | Close a `Writable` file stream.
387+
-- |
388+
-- | Will complete after the file stream is closed.
389+
writableClose
390+
:: forall m w
391+
. MonadAff m
392+
=> Writable w
393+
-> m Unit
394+
writableClose w = liftAff <<< makeAff $ \res -> do
395+
396+
removeError <- onceError w $ res <<< Left
397+
398+
writeStreamClose w do
399+
removeError
400+
res (Right unit)
401+
402+
pure nonCanceler

src/Node/Stream/Internal.js

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,27 @@
1-
import timers from 'node:timers';
2-
3-
export function onceReadable(s) {
4-
return f => () => {
5-
s.once('readable', f);
6-
return () => {s.removeListener('readable', f);};
7-
};
1+
export const onceReadable = s => f => () => {
2+
s.once('readable', f);
3+
return () => {s.removeListener('readable', f);};
84
}
95

10-
export function onceEnd(s) {
11-
return f => () => {
12-
s.once('end', f);
13-
return () => {s.removeListener('end', f);};
14-
};
6+
export const onceEnd = s => f => () => {
7+
s.once('end', f);
8+
return () => {s.removeListener('end', f);};
159
}
1610

17-
export function onceDrain(s) {
18-
return f => () => {
19-
s.once('drain', f);
20-
return () => {s.removeListener('drain', f);};
21-
};
11+
export const onceDrain = s => f => () => {
12+
s.once('drain', f);
13+
return () => {s.removeListener('drain', f);};
2214
}
2315

24-
export function onceError(s) {
25-
return f => () => {
26-
s.once('error', f);
27-
return () => {s.removeListener('error', f);};
28-
};
16+
export const onceError = s => f => () => {
17+
s.once('error', f);
18+
return () => {s.removeListener('error', f);};
2919
}
3020

31-
export function readable(s) {
32-
return () => {
21+
export const readable = s => () => {
3322
return s.readable;
34-
}
35-
}
23+
}
24+
25+
export const writeStreamClose = s => cb => () => {
26+
return s.close(cb);
27+
}

src/Node/Stream/Internal.purs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
-- | Maybe the stuff in here should be moved into the
2+
-- | [__Node.Stream__](https://pursuit.purescript.org/packages/purescript-node-streams/docs/Node.Stream)
3+
-- | module?
14
module Node.Stream.Aff.Internal
25
( onceDrain
36
, onceEnd
47
, onceError
58
, onceReadable
69
, readable
10+
, writeStreamClose
711
)
812
where
913

@@ -65,4 +69,14 @@ foreign import onceError
6569
foreign import readable
6670
:: forall r
6771
. Readable r
68-
-> Effect Boolean
72+
-> Effect Boolean
73+
74+
-- | The [`writeStream.close([callback])`](https://nodejs.org/api/fs.html#writestreamclosecallback)
75+
-- | function.
76+
-- |
77+
-- | Accepts a callback that will be executed once the writeStream is closed.
78+
foreign import writeStreamClose
79+
:: forall w
80+
. Writable w
81+
-> Effect Unit
82+
-> Effect Unit

test/Main.purs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import Node.Buffer (Buffer, concat)
2121
import Node.Buffer as Buffer
2222
import Node.Encoding (Encoding(..))
2323
import Node.FS.Stream (createReadStream, createWriteStream)
24-
import Node.Stream.Aff (readAll, readN, readSome, write)
24+
import Node.Stream.Aff (readAll, readN, readSome, writableClose, write)
2525
import Partial.Unsafe (unsafePartial)
2626
import Test.Spec (describe, it)
27-
import Test.Spec.Assertions (shouldEqual)
27+
import Test.Spec.Assertions (expectError, shouldEqual)
2828
import Test.Spec.Reporter (consoleReporter)
2929
import Test.Spec.Runner (defaultConfig, runSpecT)
3030
import Unsafe.Coerce (unsafeCoerce)
@@ -57,5 +57,12 @@ main = unsafePartial $ do
5757
input :: Buffer <- liftEffect $ concat inputs
5858
inputSize <- liftEffect $ Buffer.size input
5959
shouldEqual (10 * magnitude) inputSize
60+
it "writes and closes" do
61+
let outfilename = "/tmp/test2.txt"
62+
outfile <- liftEffect $ createWriteStream outfilename
63+
b <- liftEffect $ Buffer.fromString "test" UTF8
64+
write outfile [b]
65+
writableClose outfile
66+
expectError $ write outfile [b]
6067

6168
pure (pure unit)

0 commit comments

Comments
 (0)