Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/cabal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
cabal-flags: ""
- ghc-version: "9.6.4"
cabal-flags: ""
- ghc-version: "9.8.2"
- ghc-version: "9.8.4"
cabal-flags: ""
- ghc-version: "9.10.1"
cabal-flags: ""
Expand Down Expand Up @@ -77,4 +77,10 @@ jobs:
# We have seen in the past some tests hang for hours, wasting resources.
# The timeout below should be plenty
timeout-minutes: 10
run: cabal test all ${{matrix.cabal-flags}}
run: cabal test all ${{matrix.cabal-flags}}

- name: Run Cloud Haskell QUIC tests
# We have seen in the past some tests hang for hours, wasting resources.
# The timeout below should be plenty
timeout-minutes: 10
run: cabal test distributed-process-tests:TestCHInQUIC -f quic ${{matrix.cabal-flags}}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ as well as libraries for networking, including:

* [network-transport](https://github.com/haskell-distributed/distributed-process/tree/master/packages/network-transport)
* [network-transport-tcp](https://github.com/haskell-distributed/distributed-process/tree/master/packages/network-transport-tcp)
* [network-transport-quic](https://github.com/haskell-distributed/distributed-process/tree/master/packages/network-transport-quic)
* [network-transport-inmemory](https://github.com/haskell-distributed/distributed-process/tree/master/packages/network-transport-inmemory)

See http://haskell-distributed.github.io for documentation, user guides, tutorials and assistance.
Expand Down
4 changes: 3 additions & 1 deletion cabal.project
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
packages: packages/*/**.cabal

package distributed-process-tests
flags: +tcp
-- There is also the +quic flag, but for some reason,
-- QUIC-based tests aren't running correctly in CI at this time
flags: +tcp
20 changes: 20 additions & 0 deletions packages/distributed-process-tests/distributed-process-tests.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ maintainer: The Distributed Haskell team
copyright: Well-Typed LLP
category: Control, Cloud Haskell
build-type: Simple
extra-source-files: tests/credentials/*

source-repository head
Type: git
Expand All @@ -21,6 +22,10 @@ flag tcp
Description: build and run TCP tests
Default: False

flag quic
Description: build and run QUIC tests
Default: False

common warnings
ghc-options: -Wall
-Wcompat
Expand Down Expand Up @@ -99,6 +104,21 @@ Test-Suite TestCHInTCP
ghc-options: -threaded -rtsopts -with-rtsopts=-N -fno-warn-unused-do-bind
HS-Source-Dirs: tests

Test-Suite TestCHInQUIC
import: warnings
Type: exitcode-stdio-1.0
Main-Is: runQUIC.hs
if flag(quic)
Build-Depends: base >= 4.14 && < 5,
distributed-process-tests,
filepath,
network-transport-quic,
tasty >= 1.5 && <1.6,
else
Buildable: False
default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N
HS-Source-Dirs: tests

Test-Suite TestClosure
import: warnings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Control.Distributed.Process.Tests.Internal.Utils (pause)
import Control.Distributed.Process.Serializable (Serializable)
import Data.Maybe (isNothing, isJust)
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.HUnit (Assertion, assertBool, assertEqual, testCase)
import Test.Tasty.HUnit (Assertion, assertBool, assertEqual, testCase, assertFailure)

newtype Ping = Ping ProcessId
deriving (Typeable, Binary, Show)
Expand Down Expand Up @@ -220,11 +220,11 @@ testPing TestTransport{..} = do
p <- expectTimeout 3000000
case p of
Just (Ping _) -> return ()
Nothing -> die "Failed to receive Ping"
Nothing -> let msg = "Failed to receive Ping" in liftIO (putMVar clientDone (Left msg)) >> die msg

putMVar clientDone ()
putMVar clientDone (Right ())

takeMVar clientDone
takeMVar clientDone >>= either assertFailure pure

-- | Monitor a process on an unreachable node
testMonitorUnreachable :: TestTransport -> Bool -> Bool -> Assertion
Expand Down
22 changes: 22 additions & 0 deletions packages/distributed-process-tests/tests/credentials/cert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-----BEGIN CERTIFICATE-----
MIIDoTCCAomgAwIBAgIUVp3lTRQWZSOwolWHNaghO6gR68owDQYJKoZIhvcNAQEL
BQAwRTESMBAGA1UEAwwJMTI3LjAuMC4xMQswCQYDVQQGEwJDQTEPMA0GA1UECAwG
UXVlYmVjMREwDwYDVQQHDAhNb250cmVhbDAgFw0yNTA4MTgwMDU1MDRaGA8yMTI1
MDcyNTAwNTUwNFowRTESMBAGA1UEAwwJMTI3LjAuMC4xMQswCQYDVQQGEwJDQTEP
MA0GA1UECAwGUXVlYmVjMREwDwYDVQQHDAhNb250cmVhbDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAORALZlg9Qmu+A2HT4MUjF1iGUdWF6tlRgF6+zLZ
uvuSM+eR0yH+EJZB2xqanzkXHVAkAnHPWRZ2HWqTS7TLOMyRdPEkiCg+WmW2f0t0
hNCjZVMviahQgOwHkbTZbfsUHTv65cEk4XCgvQXFteMC+Q3lCeXWGoeMOt7AZ3ld
vf7jgmPTQXOQFhqa9q5Qcxn+b1+2NBgQXqEQTVARBLPbCB4M0SKLZ4fWK4VHZsbe
k8fUJBGgz/gTDNNClUiVBhBiv/9uvunZRpU1QBN5tZYXAPc0hX608L33R+LFsoDM
cO5+j+XIjvxWNk94cmM/cb4PLlZBeNBlXxWxY1lKAxjja58CAwEAAaOBhjCBgzAd
BgNVHQ4EFgQUGj/6Vt/0fjbTGBHPZNRIxJywRnkwHwYDVR0jBBgwFoAUGj/6Vt/0
fjbTGBHPZNRIxJywRnkwDgYDVR0PAQH/BAQDAgWgMCAGA1UdJQEB/wQWMBQGCCsG
AQUFBwMBBggrBgEFBQcDAjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUA
A4IBAQA+AuoFBODpWaWrVSjdGZPHP4DtlhB9jDy0WmUBJ8BxeB8SooJoyTsBXVhq
7ACKp11rxJPk9Tv9JOsRrWi+YLzgs+QsKpUKb6RK5nszz17K1md8BavGzE4n/e0F
tzYvWAeyIazHW551GMB1MkpSVcsJNqe91z35qmykmwIo8h+BgqTFzUFiln6bLnqP
KxrWKdlVh2BGEVbH5APClQii0bX1qEn0A8CkAMbldC1GNFbfhyxk1v+8CVK1M6Nx
BrTe15/CVTw/ceCfFZra4DinsflyCP+CcitGOUhWKgrUSiyN8xtr+Wopq4+ntm/Z
ku6j3frrSJnT9A+nZyyGvZlSPrxf
-----END CERTIFICATE-----
28 changes: 28 additions & 0 deletions packages/distributed-process-tests/tests/credentials/cert.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDkQC2ZYPUJrvgN
h0+DFIxdYhlHVherZUYBevsy2br7kjPnkdMh/hCWQdsamp85Fx1QJAJxz1kWdh1q
k0u0yzjMkXTxJIgoPlpltn9LdITQo2VTL4moUIDsB5G02W37FB07+uXBJOFwoL0F
xbXjAvkN5Qnl1hqHjDrewGd5Xb3+44Jj00FzkBYamvauUHMZ/m9ftjQYEF6hEE1Q
EQSz2wgeDNEii2eH1iuFR2bG3pPH1CQRoM/4EwzTQpVIlQYQYr//br7p2UaVNUAT
ebWWFwD3NIV+tPC990fixbKAzHDufo/lyI78VjZPeHJjP3G+Dy5WQXjQZV8VsWNZ
SgMY42ufAgMBAAECggEAGfwodM6x9tFBkiC2b6DWPgdeA14Mwcl8x8xdbrOU8vD5
EcLrO3J2JvUGYaf6uoAkKSyATr6hUMpPnQN52fJM3BUvMAjNq2810WCOa2OvfyUq
8uZ1kIDhvH08HE+okq3+igaNQ4jUVYMnIdIZW+fJvMg3cUAHsyjGxvc2kH2YlLzQ
3zxEFacnTb2K/Sxa/rFC7O3r2M6casTVsqfLyeShnSLEwLLk8tzCZZc6Sap9rVgh
CIcUhZFGxLYWMBJwRs68rmgT7rvQvh8NxzDMGM9Z/AQzeeHAvjAkb4gZBu+W69vD
CYjMi3cchdG/2ouYqijdv9DcqRDfz6BDwf8fT96dyQKBgQD0rGreqY7E8Wnt3EjF
TYwi6Hj7r6gMw3kdIIJ49st2lTvOmeZpvJX7DOh43NNidx9q2Ai1XCCEDQlpPS7i
UnqOLwX0gGYZjYkI8QSdNbJ9T4wepfSeox7dte/xnglEkfipHV3tLqhurgw+wvGW
52hBB6DVSumzjcG/hrvkDth31QKBgQDu0SMH5mg4L4KaT9+qZm3IW+Xey3vwPFES
w4bGsmAddzxXRIw6+ut2+AX/WSccUnZmgtiKKzS1yrBXGa98dqzjGRcDnbchkm+6
Ka1s3ZSx7cjgya43jLIZ9ycwva8+OPPfzrOB6zLgIauwi5B7JsB1Qt81AXeo5/jb
S64FRXkjowKBgChebj+QoEK0RjL9nnAXTGDSFGwKXmLEua3pmD1XEtjc5IJA+DhH
6kMCrTSL0sCzQNbDECTEL4U6FWxssNicnSXqckQWD0J2DL8R7R33JxzvzAGehg7K
gSQ5iX5HAeZzYyCb/MxOX3Hre4+7YFrykUvxc0Ld2lNKt0XfeA63uFWFAoGAOMfk
ylYP5Xv2U3Y2Oa+M3pxq9SPwXdgZdpqiis+SZq8Y267ioItUPL8PvfyWffdlS05E
6eUH7Uk50Bu9S5xz0rL+c8+l4QeOJPcP0tiEKCHfJwMMtwxutBm9aatP5T1pToc4
yuT+/adDyQAF5CH8lGTH6TRmHPS6iHlf8MTp3n0CgYEAwUWjiimBoPQV3X2mHYp5
yXBKGrsEItOmZUKYpl9UGVdGHHuZqzKi5ckOUK+vfd2uH9toUBMFK5aBM3VmFWPb
3IpTrYe/Zu545dZszESjpl9JeiiSOVvPllCh0BrOAK1TwRapWUTsS8ut5pt5zLuo
VbKNvUzMHtq6vp511AD0zCY=
-----END PRIVATE KEY-----
55 changes: 55 additions & 0 deletions packages/distributed-process-tests/tests/runQUIC.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{-# LANGUAGE LambdaCase #-}
-- Run tests using the QUIC transport.
--
module Main where

import Control.Exception (throwIO)
import Data.List.NonEmpty (NonEmpty(..))
import Network.Transport.QUIC
( createTransport,
credentialLoadX509,
QUICTransportConfig(..) )
import Network.Transport.Test (TestTransport (..))
import System.IO
( hSetBuffering, stderr, stdout, BufferMode(LineBuffering) )
import Control.Distributed.Process.Tests.CH (tests)
import Test.Tasty (defaultMain, localOption)
import Test.Tasty.Runners (NumThreads)
import System.FilePath ((</>))

main :: IO ()
main = do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
credentialLoadX509
-- Generate a self-signed x509v3 certificate using this nifty tool:
-- https://certificatetools.com/
("tests" </> "credentials" </> "cert.crt")
("tests" </> "credentials" </> "cert.key")
>>= \case
Left errmsg -> throwIO (userError errmsg)
Right creds -> do
transport <-
createTransport
( QUICTransportConfig
{ hostName = "127.0.0.1",
serviceName = "0",
credentials = creds :| [],
validateCredentials = False -- self-signed certificates cannot be validated
}
)
ts <-
tests
TestTransport
{ testTransport = transport,
testBreakConnection = \_ _ -> pure () -- I'm not sure how to break the connection at this time
}
-- Tests are time sensitive. Running the tests concurrently can slow them
-- down enough that threads using threadDelay would wake up later than
-- expected, thus changing the order in which messages were expected.
-- Therefore we run the tests sequentially
--
-- The problem was first detected with
-- 'Control.Distributed.Process.Tests.CH.testMergeChannels'
-- in particular.
defaultMain (localOption (1 :: NumThreads) ts)
20 changes: 20 additions & 0 deletions packages/network-transport-quic/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Copyright (c) Laurent P. René de Cotret

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
154 changes: 154 additions & 0 deletions packages/network-transport-quic/bench/Bench.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Main where

import Control.Concurrent (forkIO)
import Control.Concurrent.Async (forConcurrently_)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Exception (finally, throwIO)
import Control.Monad (forM_, replicateM, void, when)
import qualified Data.ByteString as BS
import Data.IORef (
atomicModifyIORef',
newIORef,
)
import Data.List.NonEmpty (NonEmpty (..))
import Network.Transport (
Connection (send),
EndPoint (address, connect, receive),
Event (ConnectionOpened, Received),
Reliability (ReliableOrdered),
Transport (closeTransport, newEndPoint),
defaultConnectHints,
)
import qualified Network.Transport.QUIC as QUIC
import qualified Network.Transport.TCP as TCP
import System.FilePath ((</>))
import Test.Tasty (TestTree)
import Test.Tasty.Bench (bench, bgroup, defaultMain, nfIO)

data TransportConfig = TransportConfig
{ transportName :: String
, mkTransport :: IO Transport
}

tcpConfig :: TransportConfig
tcpConfig =
TransportConfig
{ transportName = "TCP"
, mkTransport = do
Right t <- TCP.createTransport (TCP.defaultTCPAddr "127.0.0.1" "0") TCP.defaultTCPParameters
pure t
}

quicConfig :: TransportConfig
quicConfig =
TransportConfig
{ transportName = "QUIC"
, mkTransport =
QUIC.credentialLoadX509
-- Generate a self-signed x509v3 certificate using this nifty tool:
-- https://certificatetools.com/
("test" </> "credentials" </> "cert.crt")
("test" </> "credentials" </> "cert.key")
>>= \case
Left errmsg -> throwIO $ userError errmsg
Right credentials ->
QUIC.createTransport "127.0.0.1" "0" (credentials :| [])
}

data BenchParams = BenchParams
{ messageSize :: !Int
, messageCount :: !Int
, connectionCount :: !Int
}

smallMessages, mediumMessages, largeMessages :: BenchParams
smallMessages = BenchParams{messageSize = 64, messageCount = 10_000, connectionCount = 1}
mediumMessages = BenchParams{messageSize = 1024, messageCount = 1_000, connectionCount = 1}
largeMessages = BenchParams{messageSize = 4096, messageCount = 100, connectionCount = 1}

multiConn :: Int -> BenchParams -> BenchParams
multiConn n p = p{connectionCount = n}

throughputBench :: TransportConfig -> BenchParams -> IO ()
throughputBench TransportConfig{mkTransport} BenchParams{messageSize, messageCount, connectionCount} = do
transport <- mkTransport
flip finally (closeTransport transport) $ do
Right senderEP <- newEndPoint transport
Right receiverEP <- newEndPoint transport

let payload = BS.replicate messageSize 0x42
totalMessages = messageCount * connectionCount

receiverReady <- newEmptyMVar
receiverDone <- newEmptyMVar

void $ forkIO $ do
connsEstablished <- newIORef (0 :: Int)
let waitForConnections = do
event <- receive receiverEP
case event of
ConnectionOpened{} -> do
n <- atomicModifyIORef' connsEstablished (\x -> (x + 1, x + 1))
when (n < connectionCount) waitForConnections
_ -> waitForConnections
waitForConnections
putMVar receiverReady ()

msgsReceived <- newIORef (0 :: Int)
let recvLoop = do
event <- receive receiverEP
case event of
Received _ _ -> do
n <- atomicModifyIORef' msgsReceived (\x -> (x + 1, x + 1))
when (n < totalMessages) recvLoop
_ -> recvLoop
recvLoop
putMVar receiverDone ()

let receiverAddr = address receiverEP
connections <-
replicateM
connectionCount
( connect senderEP receiverAddr ReliableOrdered defaultConnectHints >>= either throwIO pure
)

takeMVar receiverReady

forConcurrently_ connections $ \conn ->
forM_ [0 .. messageCount] $ \_ -> send conn [payload]

takeMVar receiverDone

benchTransport :: TransportConfig -> TestTree
benchTransport cfg@TransportConfig{transportName} =
bgroup
transportName
[ bgroup
"throughput"
[ bgroup
"single-connection"
[ bench "small-msg" $ nfIO $ throughputBench cfg smallMessages
, bench "default-msg" $ nfIO $ throughputBench cfg mediumMessages
, bench "large-msg" $ nfIO $ throughputBench cfg largeMessages
]
, bgroup
"multi-connection"
[ bench "2-conn" $ nfIO $ throughputBench cfg smallMessages{connectionCount = 2, messageCount = 10_000}
, bench "5-conn" $ nfIO $ throughputBench cfg smallMessages{connectionCount = 5, messageCount = 10_000}
, bench "10-conn" $ nfIO $ throughputBench cfg smallMessages{connectionCount = 10, messageCount = 5_000}
]
]
]

main :: IO ()
main =
defaultMain
[ benchTransport tcpConfig
, benchTransport quicConfig
]
Loading
Loading