Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 3655c41

Browse files
committed
WIP
1 parent 5cfc996 commit 3655c41

File tree

6 files changed

+195
-50
lines changed

6 files changed

+195
-50
lines changed

spago.dhall

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ to generate this file without the comments in this block.
2222
, "either"
2323
, "maybe"
2424
, "prelude"
25+
-- , "js-timers"
2526
]
2627
, packages = ./packages.dhall
2728
, sources = [ "src/**/*.purs" ]

src/Node/Stream/Aff.purs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,19 @@
7373
-- | The canceller argument is an action to perform in the event that
7474
-- | this `Aff` is cancelled.
7575
module Node.Stream.Aff
76-
( readSome
77-
, readSome_
76+
( module Reexport
7877
, readAll
7978
, readAll_
8079
, readN
8180
, readN_
81+
, readSome
82+
, readSome_
8283
, write
84+
, write'
8385
, write_
84-
, module Reexport
86+
, noExit
8587
)
86-
87-
where
88+
where
8889

8990
import Prelude
9091

@@ -97,11 +98,14 @@ import Data.Maybe (Maybe(..))
9798
import Effect (Effect, untilE)
9899
import Effect.Aff (effectCanceler, makeAff)
99100
import Effect.Aff.Class (class MonadAff, liftAff)
101+
import Effect.Console as Console
100102
import Effect.Exception (catchException)
101103
import Node.Buffer (Buffer)
102104
import Node.Buffer as Buffer
105+
import Node.Encoding (Encoding(..))
103106
import Node.Stream (Readable, Writable)
104107
import Node.Stream as Stream
108+
import Node.Stream.Aff.Internal (clearInterval, hasRef, setInterval)
105109
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable)
106110
import Node.Stream.Aff.Internal (unbuffer) as Reexport
107111

@@ -307,3 +311,28 @@ write_ w canceller bs = liftAff <<< makeAff $ \res -> do
307311

308312
oneWrite
309313
pure $ effectCanceler (canceller w)
314+
315+
-- | https://github.com/purescript-contrib/pulp/blob/79dd954c86a5adc57051cad127c8888756f680a6/src/Pulp/System/Stream.purs#L41
316+
write' :: forall m w. MonadAff m => Writable w -> String -> m Unit
317+
write' stream str = liftAff $ makeAff (\cb -> mempty <* void (Stream.writeString stream UTF8 str (\_ -> cb (Right unit))))
318+
319+
320+
-- | Prevent Node.js from exiting while an `Effect` is running.
321+
noExit :: Effect (Effect Unit)
322+
-- noExit :: forall a. (Effect unit -> Effect a) -> Effect a
323+
noExit = do
324+
-- Idea from
325+
-- https://stackoverflow.com/a/62869265/187223
326+
327+
id <- setInterval 1000 (pure unit)
328+
pure (clearInterval id)
329+
330+
-- id <- setInterval 1 (Console.log "INTERVAL\n")
331+
-- h <- hasRef id
332+
-- Console.log $ "TIMEOUT " <> show h
333+
-- pure do
334+
-- Console.log "INTERVAL END\n"
335+
-- clearInterval id
336+
337+
-- id <- setTimeout 1 do
338+
-- Console.log "TIMEOUT\n"

src/Node/Stream/Internal.js

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import timers from 'node:timers';
2+
13
export function onceReadable(s) {
24
return f => () => {
35
s.once('readable', f);
@@ -23,14 +25,31 @@ export function onceError(s) {
2325
}
2426

2527
export function unbuffer(s) {
26-
// // https://github.com/nodejs/node/issues/6456
27-
// return () => {
28-
// s && s.isTTY && s._handle && s._handle.setBlocking && s._handle.setBlocking(true);
29-
// };
30-
28+
// https://github.com/nodejs/node/issues/6456
3129
// https://github.com/nodejs/node/issues/6379#issuecomment-1064044886
30+
// https://nodejs.org/api/process.html#a-note-on-process-io
31+
//
32+
// Maybe the stream promise API doesn't have this problem?
33+
// https://github.com/sparksuite/waterfall-cli/issues/258
3234
return () => {
33-
// s && s._handle && s._handle.setBlocking && s._handle.setBlocking(true);
3435
s._handle.setBlocking(true);
3536
};
37+
}
38+
39+
export function setInterval(t) {
40+
return f => () => {
41+
return timers.setInterval(f, t);
42+
}
43+
}
44+
45+
export function clearInterval(timeout) {
46+
return () => {
47+
timers.clearInterval(timeout);
48+
}
49+
}
50+
51+
export function hasRef(timeout) {
52+
return () => {
53+
return timeout.hasRef();
54+
}
3655
}

src/Node/Stream/Internal.purs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
module Node.Stream.Aff.Internal
2-
( onceReadable
3-
, onceEnd
2+
( Timeout
3+
, clearInterval
4+
, hasRef
45
, onceDrain
6+
, onceEnd
57
, onceError
8+
, onceReadable
9+
, setInterval
610
, unbuffer
711
)
8-
where
12+
where
913

1014
import Prelude
1115

@@ -48,8 +52,19 @@ foreign import onceError
4852
-- |
4953
-- | Implementation:
5054
-- | https://github.com/nodejs/node/issues/6456
55+
-- |
56+
-- | If this fails then it will throw an `Error`.
5157
foreign import unbuffer
5258
:: forall w
5359
. Writable w
5460
-> Effect Unit
5561
-- foreign import stdoutUnbuffer :: Effect Unit
62+
63+
-- | https://nodejs.org/api/timers.html#class-timeout
64+
foreign import data Timeout :: Type
65+
66+
foreign import setInterval :: Int -> Effect Unit -> Effect Timeout
67+
68+
foreign import clearInterval :: Timeout -> Effect Unit
69+
70+
foreign import hasRef :: Timeout -> Effect Boolean

test/Main.purs

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,56 +10,76 @@ module Test.Main where
1010
import Prelude
1111

1212
import Data.Array as Array
13-
import Data.Either (either)
13+
import Data.Either (Either(..), either)
1414
import Effect (Effect)
15-
import Effect.Aff (runAff_)
15+
import Effect.Aff (Error, Milliseconds(..), delay, runAff_)
1616
import Effect.Class (liftEffect)
1717
import Effect.Class.Console as Console
1818
import Node.Buffer (Buffer, concat)
1919
import Node.Buffer as Buffer
2020
import Node.Encoding (Encoding(..))
2121
import Node.FS.Stream (createReadStream, createWriteStream)
22-
import Node.Process (argv)
23-
import Node.Stream.Aff (readAll, readN, readSome, write)
22+
import Node.Process (argv, stderr, stdout)
23+
import Node.Stream.Aff (noExit, readAll, readN, readSome, unbuffer, write, write')
2424
import Partial.Unsafe (unsafePartial)
2525
import Test.Spec (describe, it)
2626
import Test.Spec.Assertions (shouldEqual)
2727
import Test.Spec.Reporter (consoleReporter)
2828
import Test.Spec.Runner (runSpec)
2929
import Unsafe.Coerce (unsafeCoerce)
3030

31+
completion :: Either Error (Effect Unit) -> Effect Unit
32+
completion = case _ of
33+
Left e -> Console.error (unsafeCoerce e)
34+
Right f -> f
35+
3136
main :: Effect Unit
3237
main = unsafePartial $ do
33-
runAff_ (either (unsafeCoerce >>> Console.error) (\_ -> pure unit)) $ runSpec [consoleReporter] do
34-
describe "Node.Stream.Aff" do
35-
it "reads 1" do
36-
infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv
37-
inputs1 <- readN infile 500000
38-
bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs1
39-
shouldEqual 500000 bytesRead1
40-
inputs2 <- readSome infile
41-
inputs3 <- readAll infile
42-
let inputs = inputs1 <> inputs2 <> inputs3
43-
-- TODO read after EOF will hang
44-
-- inputs4 <- readAll infile
45-
-- inputs4 <- readSome infile
46-
-- inputs4 <- readN infile 10
47-
-- let inputs = inputs1 <> inputs2 <> inputs3 <> inputs4
48-
bytesRead :: Int
49-
<- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs
50-
shouldEqual 1000000 bytesRead
51-
input :: Buffer <- liftEffect $ concat inputs
52-
inputSize <- liftEffect $ Buffer.size input
53-
shouldEqual 1000000 inputSize
54-
it "writes 1" do
55-
let outfilename = "outfile.txt"
56-
outfile <- liftEffect $ createWriteStream outfilename
57-
outstring <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8
58-
write outfile $ Array.replicate 1000 outstring
59-
infile <- liftEffect $ createReadStream outfilename
60-
inputs <- readAll infile
61-
input :: Buffer <- liftEffect $ concat inputs
62-
inputSize <- liftEffect $ Buffer.size input
63-
shouldEqual 10000 inputSize
64-
38+
-- Console.log $ unsafeCoerce stdout
39+
-- unbuffer stdout
40+
-- Console.log $ unsafeCoerce stdout
6541

42+
-- runAff_ (either (unsafeCoerce >>> Console.error) (\_ -> pure unit)) do
43+
exiter <- noExit
44+
runAff_ completion do
45+
write' stdout "ENTER \n"
46+
runSpec [consoleReporter] do
47+
describe "Node.Stream.Aff" do
48+
it "reads 1" do
49+
infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv
50+
inputs1 <- readN infile 500000
51+
bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs1
52+
shouldEqual 500000 bytesRead1
53+
inputs2 <- readSome infile
54+
inputs3 <- readAll infile
55+
let inputs = inputs1 <> inputs2 <> inputs3
56+
-- TODO read after EOF will hang
57+
-- inputs4 <- readAll infile
58+
-- inputs4 <- readSome infile
59+
-- inputs4 <- readN infile 10
60+
-- let inputs = inputs1 <> inputs2 <> inputs3 <> inputs4
61+
bytesRead :: Int
62+
<- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs
63+
shouldEqual 1000000 bytesRead
64+
input :: Buffer <- liftEffect $ concat inputs
65+
inputSize <- liftEffect $ Buffer.size input
66+
shouldEqual 1000000 inputSize
67+
it "writes 1" do
68+
let outfilename = "outfile.txt"
69+
outfile <- liftEffect $ createWriteStream outfilename
70+
outstring <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8
71+
write outfile $ Array.replicate 1000 outstring
72+
infile <- liftEffect $ createReadStream outfilename
73+
inputs <- readAll infile
74+
input :: Buffer <- liftEffect $ concat inputs
75+
inputSize <- liftEffect $ Buffer.size input
76+
shouldEqual 10000 inputSize
77+
do
78+
-- b <- liftEffect $ Buffer.create 0
79+
b <- liftEffect $ Buffer.fromString "EXIT \n" UTF8
80+
write stdout $ Array.replicate 10 b
81+
-- delay (Milliseconds 2000.0)
82+
-- write' stdout "EXIT\n"
83+
-- liftEffect exiter
84+
-- pure $ Console.log "log completion"
85+
pure (pure unit)

test/Main2.purs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
-- | How to test:
2+
-- |
3+
-- | ```
4+
-- | spago -x spago-dev.dhall test --main Test2 | wc -c
5+
-- | ```
6+
-- |
7+
-- | We want to read from a file, not stdin, because stdin has no EOF.
8+
module Test2 where
9+
10+
import Prelude
11+
12+
import Data.Array as Array
13+
import Data.Either (Either(..), either)
14+
import Effect (Effect)
15+
import Effect.Aff (Error, Milliseconds(..), delay, runAff_)
16+
import Effect.Class (liftEffect)
17+
import Effect.Class.Console as Console
18+
import Node.Buffer (Buffer, concat)
19+
import Node.Buffer as Buffer
20+
import Node.Encoding (Encoding(..))
21+
import Node.FS.Stream (createReadStream, createWriteStream)
22+
import Node.Process (argv, stderr, stdout, stdoutIsTTY)
23+
import Node.Stream.Aff (noExit, readAll, readN, readSome, unbuffer, write, write')
24+
import Partial.Unsafe (unsafePartial)
25+
import Test.Spec (describe, it)
26+
import Test.Spec.Assertions (shouldEqual)
27+
import Test.Spec.Reporter (consoleReporter)
28+
import Test.Spec.Runner (runSpec)
29+
import Unsafe.Coerce (unsafeCoerce)
30+
31+
completion :: Either Error (Effect Unit) -> Effect Unit
32+
completion = case _ of
33+
Left e -> Console.error (unsafeCoerce e)
34+
Right f -> f
35+
36+
37+
main :: Effect Unit
38+
main = unsafePartial $ do
39+
-- Console.log $ unsafeCoerce stdout
40+
-- unbuffer stdout
41+
-- Console.log $ unsafeCoerce stdout
42+
43+
-- runAff_ (either (unsafeCoerce >>> Console.error) (\_ -> pure unit)) do
44+
45+
Console.log $ "stdoutIsTTY " <> show stdoutIsTTY <> "\n"
46+
47+
exiter <- noExit
48+
49+
runAff_ completion do
50+
-- write' stdout "ENTER \n"
51+
-- delay (Milliseconds 2000.0)
52+
do
53+
-- b <- liftEffect $ Buffer.create 0
54+
b <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8
55+
-- write stdout $ Array.replicate 100000 b
56+
write stdout $ Array.replicate 1 b
57+
-- write stdout [b]
58+
-- write' stdout "EXIT\n"
59+
liftEffect exiter
60+
-- pure $ Console.log "log completion"
61+
pure (pure unit)

0 commit comments

Comments
 (0)