Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions Data/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module Data.Pool
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM5, unless, when)
import Control.Monad (forM_, forever, join, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
Expand Down Expand Up @@ -75,6 +75,8 @@ data PoolStats = PoolStats {
-- ^ Number of creates since last reset.
, createFailures :: Int
-- ^ Number of creation failures since last reset.
, totalWaiters :: Int
-- ^ Number of threads waiting for a resource since last reset.
} deriving (Show)

-- | Pool-wide stats.
Expand Down Expand Up @@ -103,6 +105,8 @@ data LocalPool a = LocalPool {
-- ^ threads waiting for a resource
, lfin :: IORef ()
-- ^ empty value used to attach a finalizer to (internal)
, waitersSizeVar :: TVar Int
-- ^ Number of waiters
} deriving (Typeable)

data Pool a = Pool {
Expand Down Expand Up @@ -174,7 +178,7 @@ createPool create destroy numStripes idleTime maxResources = do
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- V.replicateM numStripes $
LocalPool <$> newTVarIO 0 <*> newTVarIO [] <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newQueueIO <*> newIORef ()
LocalPool <$> newTVarIO 0 <*> newTVarIO [] <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newQueueIO <*> newIORef () <*> newTVarIO 0
reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask ->
unmask $ reaper destroy idleTime localPools
fin <- newIORef ()
Expand Down Expand Up @@ -301,6 +305,7 @@ takeResource pool@Pool{..} = do
True -> do
var <- newEmptyTMVar
removeSelf <- push waiters var
modifyTVar_ waitersSizeVar (+ 1)
let getResource x = case x of
Just y -> pure (entry y)
Nothing -> create `onException` atomically (destroyResourceSTM local)
Expand Down Expand Up @@ -395,15 +400,16 @@ putResourceSTM LocalPool{..} resourceEntry = do
mWaiters <- pop waiters
case mWaiters of
Nothing -> modifyTVar_ entries (resourceEntry:)
Just w -> putTMVar w (Just resourceEntry)
Just w -> putTMVar w (Just resourceEntry) >> modifyTVar_ waitersSizeVar (subtract 1)
{-# INLINE putResourceSTM #-}

destroyResourceSTM :: LocalPool a -> STM ()
destroyResourceSTM LocalPool{..} = do
mwaiter <- pop waiters
case mwaiter of
Nothing -> modifyTVar_ inUse (subtract 1)
Just w -> putTMVar w Nothing
Just w -> putTMVar w Nothing >> modifyTVar_ waitersSizeVar (subtract 1)

{-# INLINE destroyResourceSTM #-}

-- | Destroy all resources in all stripes in the pool. Note that this
Expand All @@ -428,15 +434,15 @@ destroyAllResources Pool{..} = V.forM_ localPools $ purgeLocalPool destroy
stats :: Pool a -> Bool -> IO Stats
stats Pool{..} reset = do
let stripeStats LocalPool{..} = atomically $ do
s <- liftM5 PoolStats (readTVar highwaterVar) (readTVar inUse) (readTVar takeVar) (readTVar createVar) (readTVar createFailureVar)
s <- PoolStats <$> readTVar highwaterVar <*> readTVar inUse <*> readTVar takeVar <*> readTVar createVar <*> readTVar createFailureVar <*> readTVar waitersSizeVar
when reset $ do
mapM_ (\v -> writeTVar v 0) [takeVar, createVar, createFailureVar]
mapM_ (\v -> writeTVar v 0) [takeVar, createVar, createFailureVar, waitersSizeVar]
writeTVar highwaterVar $! currentUsage s
return s

per <- V.mapM stripeStats localPools
let poolWide = V.foldr merge (PoolStats 0 0 0 0 0) per
merge (PoolStats hw1 cu1 t1 c1 f1) (PoolStats hw2 cu2 t2 c2 f2) = PoolStats (hw1 + hw2) (cu1 + cu2) (t1 + t2) (c1 + c2) (f1 + f2)
let poolWide = V.foldr merge (PoolStats 0 0 0 0 0 0) per
merge (PoolStats hw1 cu1 t1 c1 f1 w1) (PoolStats hw2 cu2 t2 c2 f2 w2) = PoolStats (hw1 + hw2) (cu1 + cu2) (t1 + t2) (c1 + c2) (f1 + f2) (w1 + w2)
return $ Stats per poolWide

modifyTVar_ :: TVar a -> (a -> a) -> STM ()
Expand Down