Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/Chainweb/CutDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ makeLenses ''CutDbParams
defaultCutDbParams :: ChainwebVersion -> Int -> CutDbParams
defaultCutDbParams v ft = CutDbParams
{ _cutDbParamsInitialCutFile = Nothing
, _cutDbParamsBufferSize = (order g ^ (2 :: Int)) * diameter g
, _cutDbParamsBufferSize = max 10 $ (order g ^ (2 :: Int)) * diameter g
, _cutDbParamsLogLevel = Warn
, _cutDbParamsTelemetryLevel = Warn
, _cutDbParamsFetchTimeout = ft
Expand Down Expand Up @@ -257,7 +257,7 @@ instance Exception CutDbStopped where
--
data CutDb tbl = CutDb
{ _cutDbCut :: !(TVar Cut)
, _cutDbQueue :: !(PQueue (Down CutHashes))
, _cutDbQueue :: !(PQueue CutHashes)
, _cutDbAsync :: !(Async ())
, _cutDbLogFunction :: !LogFunction
, _cutDbHeaderStore :: !WebBlockHeaderStore
Expand Down Expand Up @@ -315,7 +315,7 @@ cut :: Getter (CutDb tbl) (IO Cut)
cut = to _cut

addCutHashes :: CutDb tbl -> CutHashes -> IO ()
addCutHashes db = pQueueInsertLimit (_cutDbQueue db) (_cutDbQueueSize db) . Down
addCutHashes db = pQueueInsert (_cutDbQueue db)

-- | An 'STM' version of '_cut'.
--
Expand Down Expand Up @@ -436,7 +436,7 @@ startCutDb config logfun headerStore payloadStore cutHashesStore = mask_ $ do
c <- readTVarIO cutVar
logg Info $ T.unlines $
"got initial cut:" : [" " <> block | block <- cutToTextShort c]
queue <- newEmptyPQueue
queue <- newEmptyPQueue _cutHashesWeight _cutHashesId (Just $ _cutDbParamsBufferSize config)
cutAsync <- asyncWithUnmask $ \u -> u $ processor queue cutVar
logg Debug "CutDB started"
return CutDb
Expand All @@ -456,7 +456,7 @@ startCutDb config logfun headerStore payloadStore cutHashesStore = mask_ $ do
wbhdb = _webBlockHeaderStoreCas headerStore
v = _chainwebVersion headerStore

processor :: PQueue (Down CutHashes) -> TVar Cut -> IO ()
processor :: PQueue CutHashes -> TVar Cut -> IO ()
processor queue cutVar = runForever logfun "CutDB" $
processCuts config logfun headerStore payloadStore cutHashesStore queue cutVar

Expand Down Expand Up @@ -548,7 +548,7 @@ processCuts
-> WebBlockHeaderStore
-> WebBlockPayloadStore tbl
-> Casify RocksDbTable CutHashes
-> PQueue (Down CutHashes)
-> PQueue CutHashes
-> TVar Cut
-> IO ()
processCuts conf logFun headerStore payloadStore cutHashesStore queue cutVar = do
Expand Down Expand Up @@ -601,7 +601,7 @@ processCuts conf logFun headerStore payloadStore cutHashesStore queue cutVar = d
hdrStore = _webBlockHeaderStoreCas headerStore

queueToStream = do
Down a <- liftIO (pQueueRemove queue)
a <- liftIO (pQueueRemove queue)
S.yield a
queueToStream

Expand Down Expand Up @@ -814,7 +814,7 @@ cutHashesToBlockHeaderMap conf logfun headerStore payloadStore hs =
return $! Left missing

origin = _cutOrigin hs
priority = Priority (- int (_cutHashesHeight hs))
priority = Priority (int (_cutHashesHeight hs))

tryGetBlockHeader hdrs plds localPayload cv@(cid, _) =
(Right <$> mapM (getBlockHeader headerStore payloadStore hdrs plds localPayload cid priority origin) cv)
Expand Down
4 changes: 2 additions & 2 deletions src/Chainweb/Sync/WebBlockHeaderStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ newWebBlockHeaderStore
-> IO WebBlockHeaderStore
newWebBlockHeaderStore mgr wdb logfun = do
m <- new
queue <- newEmptyPQueue
queue <- newEmptyPQueue _taskPriority _taskId Nothing
return $! WebBlockHeaderStore wdb m queue logfun mgr

newEmptyWebPayloadStore
Expand All @@ -569,7 +569,7 @@ newWebPayloadStore
-> LogFunction
-> IO (WebBlockPayloadStore tbl)
newWebPayloadStore mgr pact payloadDb logfun = do
payloadTaskQueue <- newEmptyPQueue
payloadTaskQueue <- newEmptyPQueue _taskPriority _taskId Nothing
payloadMemo <- new
return $! WebBlockPayloadStore
payloadDb payloadMemo payloadTaskQueue logfun mgr pact
Expand Down
80 changes: 45 additions & 35 deletions src/Data/PQueue.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- |
-- Module: Data.PQueue
Expand All @@ -14,19 +15,18 @@ module Data.PQueue
( PQueue
, newEmptyPQueue
, pQueueInsert
, pQueueInsertLimit
, pQueueRemove
, pQueueIsEmpty
, pQueueSize
) where

import Control.Concurrent.MVar
import Control.Exception (evaluate)
import Control.Concurrent.STM
import Control.Monad
import Data.Foldable

import qualified Data.Heap as H

import GHC.Generics
import Data.Ord
import qualified Data.Map as M
import qualified Data.Set as S

import Numeric.Natural

Expand All @@ -39,43 +39,53 @@ import Numeric.Natural
-- items in the queue. An item of low priority my starve in the queue if higher
-- priority items are added at a rate at least as high as items are removed.
--
data PQueue a = PQueue !(MVar ()) !(MVar (H.Heap a))
deriving (Generic)

newEmptyPQueue :: IO (PQueue a)
newEmptyPQueue = PQueue <$> newEmptyMVar <*> newMVar mempty
data PQueue a =
forall p k. (Ord p, Ord k) =>
PQueue (TVar (M.Map (Down p, k) a)) (TVar (S.Set k)) (a -> p) (a -> k) (Maybe Natural)

pQueueInsert :: Ord a => PQueue a -> a -> IO ()
pQueueInsert (PQueue s q) t = modifyMVarMasked_ q $ \h -> do
h' <- evaluate $ H.insert t h
void $ tryPutMVar s ()
return h'
newEmptyPQueue :: (Ord p, Ord k) => (a -> p) -> (a -> k) -> Maybe Natural -> IO (PQueue a)
newEmptyPQueue getPrio getKey maybeMaxLen = PQueue
<$> newTVarIO mempty
<*> newTVarIO mempty
<*> pure getPrio
<*> pure getKey
<*> pure maybeMaxLen

pQueueInsertLimit :: Ord a => PQueue a -> Natural -> a -> IO ()
pQueueInsertLimit (PQueue s q) l t = modifyMVarMasked_ q $ \h -> do
h' <- evaluate $ H.insert t h
void $ tryPutMVar s ()
return $! if H.size h > 2 * fromIntegral l
then H.take (fromIntegral l) h'
else h'
pQueueInsert :: PQueue a -> a -> IO ()
pQueueInsert (PQueue mv sv getPrio getKey maybeMaxLen) a =
atomically $ do
s <- readTVar sv
m <- readTVar mv
let k = getKey a
if S.member k s
then return ()
else do
let s' = S.insert k s
let m' = M.insert (Down $ getPrio a, k) a m
let fixup (maxlen :: Natural) = if M.size m' > fromIntegral (2 * maxlen)
then let (keep, dontkeep) = M.splitAt (fromIntegral maxlen) m'
in (foldl' (flip (S.delete . snd)) s' (M.keys dontkeep), keep)
else (s', m')
let (s'', m'') = maybe (s', m') fixup maybeMaxLen
writeTVar mv $! m''
writeTVar sv $! s''

pQueueIsEmpty :: PQueue a -> IO Bool
pQueueIsEmpty (PQueue _ q) = H.null <$!> readMVar q
pQueueIsEmpty (PQueue mv _ _ _ _) = M.null <$!> readTVarIO mv

pQueueSize :: PQueue a -> IO Natural
pQueueSize (PQueue _ q) = fromIntegral . H.size <$!> readMVar q
pQueueSize (PQueue mv _ _ _ _) = fromIntegral . M.size <$!> readTVarIO mv

-- | If the queue is empty it blocks and races for new items
--
pQueueRemove :: PQueue a -> IO a
pQueueRemove (PQueue s q) = run
pQueueRemove (PQueue mv sv _getPrio _getKey _) = atomically run
where
run = do
r <- modifyMVarMasked q $ \h -> case H.uncons h of
Nothing -> return (h, Nothing)
Just (!a, !b) -> do
when (H.null b) $ void $ tryTakeMVar s
return (b, Just a)
case r of
Nothing -> takeMVar s >> run
(Just !x) -> return x
m <- readTVar mv
case M.minViewWithKey m of
Nothing -> retry
Just (((_, k), a), m') -> do
writeTVar mv $! m'
modifyTVar' sv (S.delete k)
return a
4 changes: 2 additions & 2 deletions test/unit/Chainweb/Test/Sync/WebBlockHeaderStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ testQueueServer limit q = forM_ [0..] $ session_ limit q (\_ _ -> return ())

withNoopQueueServer :: (PQueue (Task env a) -> IO b) -> IO b
withNoopQueueServer a = do
q <- newEmptyPQueue
q <- newEmptyPQueue _taskPriority _taskId Nothing
let failTask = do
task <- pQueueRemove q
putIVar (_taskResult task) $ Left $ []
withAsync (forever failTask) $ const $ a q

startNoopQueueServer :: IO (Async (), PQueue (Task env a))
startNoopQueueServer = do
q <- newEmptyPQueue
q <- newEmptyPQueue _taskPriority _taskId Nothing
let failTask = do
task <- pQueueRemove q
putIVar (_taskResult task) $ Left $ []
Expand Down
54 changes: 26 additions & 28 deletions test/unit/Data/Test/PQueue.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# language ScopedTypeVariables #-}

-- |
-- Module: Data.Test.PQueue
-- Copyright: Copyright © 2018 - 2020 Kadena LLC.
Expand All @@ -15,7 +17,6 @@ module Data.Test.PQueue
, prop_insert_remove_null
, prop_insert_remove_null_concurrent
, prop_insert_remove_sort
, prop_insert_remove_sorted_concurrent
, prop_insert_remove_concurrent
) where

Expand All @@ -25,6 +26,7 @@ import Control.Monad.IO.Class
import Data.Foldable
import qualified Data.List as L
import Data.Maybe
import Data.Ord

import Test.QuickCheck
import Test.QuickCheck.Monadic
Expand All @@ -43,7 +45,6 @@ properties =
, ("equal number of inserts and remove result in empty queue" , property $ prop_insert_remove_null)
, ("equal number of concurrent inserts and remove result in empty queue" , property $ prop_insert_remove_null_concurrent)
, ("inserting and removeing a list sorts the list", property $ prop_insert_remove_sort)
, ("concurrently inserting and removing a sorted list yields the original list" , property $ prop_insert_remove_sorted_concurrent)
, ("concurrently inserting and removing a list yields the items of original list" , property $ prop_insert_remove_concurrent)
]

Expand All @@ -52,64 +53,61 @@ properties =

prop_empty :: Property
prop_empty = once $ monadicIO $ do
q <- run newEmptyPQueue
q :: PQueue Int <- run $ newEmptyPQueue id id Nothing
x <- run (pQueueIsEmpty q)
assert x
s <- run (pQueueSize q)
assert $ s == 0

prop_insert :: [Int] -> Property
prop_insert l = monadicIO $ do
let l' = zip [0 :: Int .. ] l
s <- run $ do
q <- newEmptyPQueue
traverse_ (pQueueInsert q) l
q <- newEmptyPQueue snd fst Nothing
traverse_ (pQueueInsert q) l'
pQueueSize q
assert $ s == fromIntegral (length l)

prop_insert_remove_null :: [Int] -> Property
prop_insert_remove_null l = monadicIO $ do
q <- run newEmptyPQueue
let l' = zip [0 :: Int .. ] l
q <- run $ newEmptyPQueue snd fst Nothing
s <- run $ do
traverse_ (pQueueInsert q) l
traverse_ (const $ pQueueRemove q) l
traverse_ (pQueueInsert q) l'
traverse_ (const $ pQueueRemove q) l'
pQueueSize q
assert $ s == 0
assert =<< run (pQueueIsEmpty q)

prop_insert_remove_null_concurrent :: [Int] -> Property
prop_insert_remove_null_concurrent l = monadicIO $ do
q <- run newEmptyPQueue
let l' = zip [0 :: Int .. ] l
q <- run $ newEmptyPQueue snd fst Nothing
run $ concurrently_
(traverse_ (pQueueInsert q) l)
(traverse_ (const $ pQueueRemove q) l)
(traverse_ (pQueueInsert q) l')
(traverse_ (const $ pQueueRemove q) l')
s <- run $ pQueueSize q
assert $ s == 0
assert =<< run (pQueueIsEmpty q)

prop_insert_remove_sort :: [Int] -> Property
prop_insert_remove_sort l = monadicIO $ do
q <- run newEmptyPQueue
l' <- run $ do
traverse_ (pQueueInsert q) l
traverse (const $ pQueueRemove q) l
assert $ L.sort l == l'

prop_insert_remove_sorted_concurrent :: SortedList Int -> Property
prop_insert_remove_sorted_concurrent (Sorted l) = monadicIO $ do
q <- run newEmptyPQueue
l' <- run $ snd <$> concurrently
(traverse_ (pQueueInsert q) l)
(traverse (const $ pQueueRemove q) l)
assert $ l == l'
let l' = zip [0 :: Int .. ] l
q <- run $ newEmptyPQueue snd fst Nothing
l'' <- run $ do
traverse_ (pQueueInsert q) l'
traverse (const $ pQueueRemove q) l'
return $ L.sortOn (Down . snd) l'' === l''

prop_insert_remove_concurrent :: [Int] -> Property
prop_insert_remove_concurrent l = monadicIO $ do
q <- run newEmptyPQueue
let l' = zip [0 :: Int .. ] l
q <- run $ newEmptyPQueue snd fst Nothing
commands <- pick $ shuffle
$ (QueueInsert <$> l) ++ (QueueRemove <$ l)
l' <- run $ catMaybes
$ (QueueInsert <$> l') ++ (QueueRemove <$ l')
l'' <- run $ catMaybes
<$> mapConcurrently (runQueueCommand q) commands
assert $ L.sort l == L.sort l'
return $ l' === L.sortOn fst l''

-- -------------------------------------------------------------------------- --
-- Utils
Expand Down
Loading
Loading