Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 9efb49f

Browse files
committed
Cleanup
1 parent 403cd99 commit 9efb49f

File tree

8 files changed

+88
-226
lines changed

8 files changed

+88
-226
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
run: spago -x spago-dev.dhall install
3636

3737
- name: Run tests
38-
run: spago -x spago-dev.dhall test --no-install --exec-args <(head --bytes 1000000 /dev/zero)
38+
run: spago -x spago-dev.dhall test --no-install
3939

4040
# - name: Check formatting
4141
# run: purs-tidy check src test

spago.dhall

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ to generate this file without the comments in this block.
2222
, "either"
2323
, "maybe"
2424
, "prelude"
25-
-- , "js-timers"
2625
]
2726
, packages = ./packages.dhall
2827
, sources = [ "src/**/*.purs" ]

src/Node/Stream/Aff.purs

Lines changed: 9 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,14 @@
7373
-- | The canceller argument is an action to perform in the event that
7474
-- | this `Aff` is cancelled.
7575
module Node.Stream.Aff
76-
( module Reexport
77-
, readAll
76+
( readAll
7877
, readAll_
7978
, readN
8079
, readN_
8180
, readSome
8281
, readSome_
8382
, write
84-
, write'
8583
, write_
86-
, noExit
8784
)
8885
where
8986

@@ -101,29 +98,29 @@ import Effect.Aff.Class (class MonadAff, liftAff)
10198
import Effect.Exception (catchException)
10299
import Node.Buffer (Buffer)
103100
import Node.Buffer as Buffer
104-
import Node.Encoding (Encoding(..))
105101
import Node.Stream (Readable, Writable)
106102
import Node.Stream as Stream
107-
import Node.Stream.Aff.Internal (clearInterval, hasRef, onceReadable, setInterval)
108103
import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable)
109-
import Node.Stream.Aff.Internal (unbuffer) as Reexport
110104

111105

112-
-- | Wait until there is some data available from the stream.
106+
-- | Wait until there is some data available from the stream, then read it.
113107
-- |
114-
-- | This function is not currently very useful because there is no way to
115-
-- | know when a stream has already reached its end, and if this
108+
-- | This function is useful for streams like __stdin__ which never
109+
-- | reach End-Of-File.
110+
-- |
111+
-- | There is no way (?) to reliably detect with *Node.js*
112+
-- | when a stream has already reached its end, and if this
116113
-- | function is called after the stream has ended then the call will
117114
-- | never complete. So we can `readSome` one time and it will complete, but
118-
-- | then we don’t know if the next call to `readSome` will complete.
115+
-- | if the stream reached its end then the next call to `readSome`
116+
-- | will never complete.
119117
readSome
120118
:: forall m r
121119
. MonadAff m
122120
=> Readable r
123121
-> m (Array Buffer)
124122
readSome r = readSome_ r (\_ -> pure unit)
125123

126-
127124
-- | __readSome__ with a canceller argument.
128125
readSome_
129126
:: forall m r
@@ -178,7 +175,6 @@ readAll
178175
-> m (Array Buffer)
179176
readAll r = readAll_ r (\_ -> pure unit)
180177

181-
182178
-- | __readAll__ with a canceller argument.
183179
readAll_
184180
:: forall m r
@@ -311,43 +307,6 @@ readN_ r canceller n = liftAff <<< makeAff $ \res -> do
311307
tryToRead waitToRead
312308
waitToRead unit
313309

314-
-- fix \waitToRead -> do
315-
-- void $ onceReadable r do
316-
-- tryToRead waitToRead
317-
-- let
318-
-- waitToRead = do
319-
-- void $ onceReadable r do
320-
-- tryToRead waitToRead -- this is not recursion
321-
322-
323-
-- let
324-
-- oneRead = do
325-
-- void $ onceReadable r do
326-
-- catchException cleanupRethrow do
327-
-- untilE do
328-
-- red <- liftST $ STRef.read redRef
329-
-- -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size
330-
-- -- “If size bytes are not available to be read, null will be returned
331-
-- -- unless the stream has ended, in which case all of the data remaining
332-
-- -- in the internal buffer will be returned.”
333-
-- Stream.read r (Just (n-red)) >>= case _ of
334-
-- Nothing -> pure true
335-
-- Just chunk -> do
336-
-- _ <- liftST $ Array.ST.push chunk bufs
337-
-- s <- Buffer.size chunk
338-
-- red' <- liftST $ STRef.modify (_+s) redRef
339-
-- if red' >= n then
340-
-- pure true
341-
-- else
342-
-- pure false
343-
-- red <- liftST $ STRef.read redRef
344-
-- if red >= n then do
345-
-- ret <- liftST $ Array.ST.unsafeFreeze bufs
346-
-- res $ Right ret
347-
-- else
348-
-- oneRead -- this is not recursion
349-
350-
-- waitToRead
351310
pure $ effectCanceler (canceller r)
352311

353312

@@ -401,41 +360,3 @@ write_ w canceller bs = liftAff <<< makeAff $ \res -> do
401360
oneWrite
402361
removeError
403362
pure $ effectCanceler (canceller w)
404-
405-
406-
-- TODO Remove all listeners before returning
407-
-- https://nodejs.org/api/events.html#emitterremovelistenereventname-listener
408-
--
409-
-- (node:483527) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added to [Socket]. Use emitter.setMaxListeners() to increase limit
410-
-- (Use `node --trace-warnings ...` to show where the warning was created)
411-
--
412-
-- Buffer.size 28
413-
--
414-
-- (node:483527) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added to [Socket]. Use emitter.setMaxListeners() to increase limit
415-
416-
417-
418-
-- | https://github.com/purescript-contrib/pulp/blob/79dd954c86a5adc57051cad127c8888756f680a6/src/Pulp/System/Stream.purs#L41
419-
write' :: forall m w. MonadAff m => Writable w -> String -> m Unit
420-
write' stream str = liftAff $ makeAff (\cb -> mempty <* void (Stream.writeString stream UTF8 str (\_ -> cb (Right unit))))
421-
422-
423-
-- | Prevent Node.js from exiting while an `Effect` is running.
424-
noExit :: Effect (Effect Unit)
425-
-- noExit :: forall a. (Effect unit -> Effect a) -> Effect a
426-
noExit = do
427-
-- Idea from
428-
-- https://stackoverflow.com/a/62869265/187223
429-
430-
id <- setInterval 1000 (pure unit)
431-
pure (clearInterval id)
432-
433-
-- id <- setInterval 1 (Console.log "INTERVAL\n")
434-
-- h <- hasRef id
435-
-- Console.log $ "TIMEOUT " <> show h
436-
-- pure do
437-
-- Console.log "INTERVAL END\n"
438-
-- clearInterval id
439-
440-
-- id <- setTimeout 1 do
441-
-- Console.log "TIMEOUT\n"

src/Node/Stream/Internal.js

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,3 @@ export function onceError(s) {
2727
return () => {s.removeListener('error', f);};
2828
};
2929
}
30-
31-
32-
33-
export function unbuffer(s) {
34-
// https://github.com/nodejs/node/issues/6456
35-
// https://github.com/nodejs/node/issues/6379#issuecomment-1064044886
36-
// https://nodejs.org/api/process.html#a-note-on-process-io
37-
//
38-
// Maybe the stream promise API doesn't have this problem?
39-
// https://github.com/sparksuite/waterfall-cli/issues/258
40-
return () => {
41-
s._handle.setBlocking(true);
42-
};
43-
}
44-
45-
export function setInterval(t) {
46-
return f => () => {
47-
return timers.setInterval(f, t);
48-
}
49-
}
50-
51-
export function clearInterval(timeout) {
52-
return () => {
53-
timers.clearInterval(timeout);
54-
}
55-
}
56-
57-
export function hasRef(timeout) {
58-
return () => {
59-
return timeout.hasRef();
60-
}
61-
}

src/Node/Stream/Internal.purs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
module Node.Stream.Aff.Internal
2-
( Timeout
3-
, clearInterval
4-
, hasRef
5-
, onceDrain
2+
( onceDrain
63
, onceEnd
74
, onceError
85
, onceReadable
9-
, setInterval
10-
, unbuffer
116
)
127
where
138

@@ -60,25 +55,3 @@ foreign import onceError
6055
. Stream r
6156
-> (Error -> Effect Unit)
6257
-> Effect (Effect Unit)
63-
64-
-- | Issue:
65-
-- | https://github.com/nodejs/node/issues/6379
66-
-- |
67-
-- | Implementation:
68-
-- | https://github.com/nodejs/node/issues/6456
69-
-- |
70-
-- | If this fails then it will throw an `Error`.
71-
foreign import unbuffer
72-
:: forall w
73-
. Writable w
74-
-> Effect Unit
75-
-- foreign import stdoutUnbuffer :: Effect Unit
76-
77-
-- | https://nodejs.org/api/timers.html#class-timeout
78-
foreign import data Timeout :: Type
79-
80-
foreign import setInterval :: Int -> Effect Unit -> Effect Timeout
81-
82-
foreign import clearInterval :: Timeout -> Effect Unit
83-
84-
foreign import hasRef :: Timeout -> Effect Boolean

test/Main.purs

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- | How to test:
22
-- |
33
-- | ```
4-
-- | spago -x spago-dev.dhall test --exec-args <(head --bytes 1000000 /dev/zero)
4+
-- | spago -x spago-dev.dhall test
55
-- | ```
66
-- |
77
-- | We want to read from a file, not stdin, because stdin has no EOF.
@@ -10,22 +10,22 @@ module Test.Main where
1010
import Prelude
1111

1212
import Data.Array as Array
13-
import Data.Either (Either(..), either)
13+
import Data.Either (Either(..))
14+
import Data.Maybe (Maybe(..))
1415
import Effect (Effect)
15-
import Effect.Aff (Error, Milliseconds(..), delay, runAff_)
16+
import Effect.Aff (Error, Milliseconds(..), runAff_)
1617
import Effect.Class (liftEffect)
1718
import Effect.Class.Console as Console
1819
import Node.Buffer (Buffer, concat)
1920
import Node.Buffer as Buffer
2021
import Node.Encoding (Encoding(..))
2122
import Node.FS.Stream (createReadStream, createWriteStream)
22-
import Node.Process (argv, stderr, stdout)
23-
import Node.Stream.Aff (noExit, readAll, readN, readSome, unbuffer, write, write')
23+
import Node.Stream.Aff (readAll, readN, readSome, write)
2424
import Partial.Unsafe (unsafePartial)
2525
import Test.Spec (describe, it)
2626
import Test.Spec.Assertions (shouldEqual)
2727
import Test.Spec.Reporter (consoleReporter)
28-
import Test.Spec.Runner (runSpec)
28+
import Test.Spec.Runner (defaultConfig, runSpecT)
2929
import Unsafe.Coerce (unsafeCoerce)
3030

3131
completion :: Either Error (Effect Unit) -> Effect Unit
@@ -35,51 +35,21 @@ completion = case _ of
3535

3636
main :: Effect Unit
3737
main = unsafePartial $ do
38-
-- Console.log $ unsafeCoerce stdout
39-
-- unbuffer stdout
40-
-- Console.log $ unsafeCoerce stdout
41-
42-
-- runAff_ (either (unsafeCoerce >>> Console.error) (\_ -> pure unit)) do
43-
exiter <- noExit
4438
runAff_ completion do
45-
write' stdout "ENTER \n"
46-
runSpec [consoleReporter] do
39+
void $ runSpecT (defaultConfig {timeout = Just (Milliseconds 10000.0)}) [consoleReporter] do
4740
describe "Node.Stream.Aff" do
48-
it "reads 1" do
49-
infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv
50-
inputs1 <- readN infile 500000
51-
bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs1
52-
shouldEqual 500000 bytesRead1
53-
inputs2 <- readSome infile
54-
inputs3 <- readAll infile
55-
let inputs = inputs1 <> inputs2 <> inputs3
56-
-- TODO read after EOF will hang
57-
-- inputs4 <- readAll infile
58-
-- inputs4 <- readSome infile
59-
-- inputs4 <- readN infile 10
60-
-- let inputs = inputs1 <> inputs2 <> inputs3 <> inputs4
61-
bytesRead :: Int
62-
<- liftEffect $ Array.foldM (\a b -> (a+_) <$> Buffer.size b) 0 inputs
63-
shouldEqual 1000000 bytesRead
64-
input :: Buffer <- liftEffect $ concat inputs
65-
inputSize <- liftEffect $ Buffer.size input
66-
shouldEqual 1000000 inputSize
67-
it "writes 1" do
68-
let outfilename = "outfile.txt"
41+
it "writes and reads" do
42+
let outfilename = "/tmp/test1.txt"
43+
let magnitude = 100000
6944
outfile <- liftEffect $ createWriteStream outfilename
7045
outstring <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8
71-
write outfile $ Array.replicate 1000 outstring
46+
write outfile $ Array.replicate magnitude outstring
7247
infile <- liftEffect $ createReadStream outfilename
73-
inputs <- readAll infile
48+
input1 <- readSome infile
49+
input2 <- readN infile (5 * magnitude)
50+
input3 <- readAll infile
51+
let inputs = input1 <> input2 <> input3
7452
input :: Buffer <- liftEffect $ concat inputs
7553
inputSize <- liftEffect $ Buffer.size input
76-
shouldEqual 10000 inputSize
77-
do
78-
-- b <- liftEffect $ Buffer.create 0
79-
b <- liftEffect $ Buffer.fromString "EXIT \n" UTF8
80-
write stdout $ Array.replicate 10 b
81-
-- delay (Milliseconds 2000.0)
82-
-- write' stdout "EXIT\n"
83-
-- liftEffect exiter
84-
-- pure $ Console.log "log completion"
54+
shouldEqual (10 * magnitude) inputSize
8555
pure (pure unit)

0 commit comments

Comments
 (0)