-
Notifications
You must be signed in to change notification settings - Fork 57
Comparison with scrive/master
#45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Downgrade local patched version to 0.2.3.2.1
Rewrite based on Control.Concurrent.QSem for better performance
parsonsmatt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation look solid!
Data/Pool/Introspection.hs
Outdated
| -- | Method of acquiring a resource from the pool. | ||
| data AcquisitionMethod | ||
| = Created | ||
| -- ^ A new resource was created. | ||
| | Taken | ||
| -- ^ An existing resource was directly taken from the pool. | ||
| | WaitedThen !AcquisitionMethod | ||
| -- ^ The thread had to wait until a resource was released. The inner method | ||
| -- signifies whether the resource was returned to the pool via 'putResource' | ||
| -- ('Taken') or 'destroyResource' ('Created'). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this type is a linked list that eventually terminates in a Created | Taken. Both use-sites do WaitedThen (Created|Taken), so I think the linked-list approach is probably not correct.
| -- | Method of acquiring a resource from the pool. | |
| data AcquisitionMethod | |
| = Created | |
| -- ^ A new resource was created. | |
| | Taken | |
| -- ^ An existing resource was directly taken from the pool. | |
| | WaitedThen !AcquisitionMethod | |
| -- ^ The thread had to wait until a resource was released. The inner method | |
| -- signifies whether the resource was returned to the pool via 'putResource' | |
| -- ('Taken') or 'destroyResource' ('Created'). | |
| -- | Method of acquiring a resource from the pool. | |
| data AcquisitionMethod | |
| = Created | |
| -- ^ A new resource was created. | |
| | Taken | |
| -- ^ An existing resource was directly taken from the pool. | |
| data AcquisitionResult | |
| | Immediate !AcquisitionMethod | |
| -- ^ The thread had to wait until a resource was released. The inner method | |
| -- signifies whether the resource was returned to the pool via 'putResource' | |
| -- ('Taken') or 'destroyResource' ('Created'). | |
| | Waited DiffTime !AcquisitionMethod |
Or, factoring it out,
data AcquisitionResult = AcquisitionResult
{ acquisitionResultDelay :: !(Maybe DiffTime)
-- ^ If 'Just', then the amount of time spent waiting on a resource to become available.
-- If 'Nothing', then the resource was available immediately.
, acquisitionResultMethod :: !AcquisitionMethod
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that DiffTime is already present on the Resource datatype, and indicates the total time spent waiting, which would also include the time spent waiting on non-WaitedThen time. I think these are probably worthwhile things to have separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the current data type allows for more states that occur in practice. Will change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my feedback 😄
| -- | Get a capability-local pool. | ||
| getLocalPool :: SmallArray (LocalPool a) -> IO (LocalPool a) | ||
| getLocalPool pools = do | ||
| (cid, _) <- threadCapability =<< myThreadId | ||
| pure $ pools `indexSmallArray` (cid `rem` sizeofSmallArray pools) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In investigating the behavior of takeResource and whether or not any other thread might have access to the LocalPool's mstripe MVar, I am investigating this function.
threadCapability returns the system thread number, so if you run with -RTS -N4, then you'll have four capabilities to work with, and forked green threads will be running on a given capability.
The Bool parameter that we are returning indicates whether the given ThreadId is locked to that specific capability, which is set by using forkOn. This is not the default means of forking threads, so it will probably be the case that forked threads will be able to switch capabilities.
We're mapping a relatively small number of capabilites (generally not more than nproc, which is 8 on my laptop and possibly 64 or more on large servers) into this SmallArray (LocalPool a). What is the size of the array? The code that produces it is here:
numStripes <- getNumCapabilities
when (numStripes < 1) $ do
error "numStripes must be at least 1"
pools <- fmap (smallArrayFromListN numStripes) . forM [1..numStripes] $ \n -> do
LocalPool n <$> newMVar Stripe
{ available = maxResources `quotCeil` numStripes
, cache = []
, queue = Empty
, queueR = Empty
}So, we have an SmallArray (LocalPool a) where the length of the array is equal to the number of capabilities. If we're running with -N1, then we have one LocalPool, and all threads share access to that local pool. If we have -N8 then we have 8 system threads and 8 entries in our LocalPool, one for each system thread.
We do cid `rem` sizeOfSmallArrayPools to be resilient to the setNumCapabilities function which can change the number of system threads that the runtime is using. Supposing we started off with -N8 and then did setNumCapabilities 4. We'd have a SmallArray of size 8, but then we'd only have 4 capabilities, so we'd have four LocalPools that would never be used. If we increased our capabilities (say -N2 and then setNumCapabilities 4), then we'd have a LocalPool with 2 entries and 4 system threads, so we'd want to map 4 `rem` 2 to give relatively fair access to the LocalPools.
This does not prevent different green threads from accessing the same LocalPool. Indeed, this design ensures that a LocalPool will be seen by all green threads that share the same system thread, or capability.
Data/Pool/Introspection.hs
Outdated
| t1 <- getMonotonicTime | ||
| localPool@(LocalPool n mstripe) <- getLocalPool (localPools pool) | ||
| stripe <- takeMVar mstripe | ||
| if available stripe == 0 | ||
| then do | ||
| q <- newEmptyMVar | ||
| putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } | ||
| waitForResource mstripe q >>= \case | ||
| Just a -> do | ||
| t2 <- getMonotonicTime | ||
| pure (Resource a n (t2 - t1) (WaitedThen Taken) 0, localPool) | ||
| Nothing -> do | ||
| a <- createResource pool `onException` restoreSize mstripe | ||
| t2 <- getMonotonicTime | ||
| pure (Resource a n (t2 - t1) (WaitedThen Created) 0, localPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To add in the time spent WaitedThen (in accordance with the above comment), we'd want a third getMonotonicTime call. Where should it go?
There are a few things here that can block after t1 <- getMonotonicTime. We don't want to record it before getLocalPool because then it's exactly equivalent to t1.
stripe <- takeMVar mstripeIf the mstripe has not been filled in, then this will block until it has a value (and this is the thread that is in-line to receive that value). However, this is something we're doing regardless of the WaitedThen constructor, so it seems like the expectation is that it would not count towards it.
putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) }We're in the branch where we return a WaitedThen constructor, so we could feasibly record the time before this line executes.
This one should not block, since we just did a takeMVar above. In order for this to block, some other thread would have to have done a putMVar on the mstripe in that LocalPool. getLocalPool allows multiple threads to receive the same LocalPool. However, getLocalPool is also only called in takeResource, so I think we can imagine this with only the sequence of events in takeResource. If we have two simultaneous calls to takeResource on the same capability, then the first one is going to do takeMVar mstripe, which will empty the MVar. The second one will call takeMVar mstripe and block. Then this one calls putMVar. Nothing else is doing a putMVar on this variable, since takeResource is the only place we're calling getLocalPool to gain access one in the first place. So the first putMVar succeeds, and our thread 2 can now finish takeMVar and proceed.
I feel pretty convinced that this won't block, so the intermediate timeWaiting can go after this line.
waitForResource mstrip q >>= \caseThis one is definitely going to block, it's in the name. It's takeMVar with a special exception cleanup handler.
a <- createResource pool `onException` restoreSize mstripeThis will take some time to complete, so we want to capture the elapsed time here, for sure.
Based on this analysis, I'd probably put it after putMVar mstripe and waitForResource.
| t1 <- getMonotonicTime | |
| localPool@(LocalPool n mstripe) <- getLocalPool (localPools pool) | |
| stripe <- takeMVar mstripe | |
| if available stripe == 0 | |
| then do | |
| q <- newEmptyMVar | |
| putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } | |
| waitForResource mstripe q >>= \case | |
| Just a -> do | |
| t2 <- getMonotonicTime | |
| pure (Resource a n (t2 - t1) (WaitedThen Taken) 0, localPool) | |
| Nothing -> do | |
| a <- createResource pool `onException` restoreSize mstripe | |
| t2 <- getMonotonicTime | |
| pure (Resource a n (t2 - t1) (WaitedThen Created) 0, localPool) | |
| t1 <- getMonotonicTime | |
| localPool@(LocalPool n mstripe) <- getLocalPool (localPools pool) | |
| stripe <- takeMVar mstripe | |
| if available stripe == 0 | |
| then do | |
| q <- newEmptyMVar | |
| putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } | |
| waitingOnResourceTime <- getMonotonicTime | |
| waitForResource mstripe q >>= \case | |
| Just a -> do | |
| t2 <- getMonotonicTime | |
| pure (Resource a n (t2 - t1) (AcquisitionResult (Just (t2 - waitingOnResourceTime) Taken) 0, localPool) | |
| Nothing -> do | |
| a <- createResource pool `onException` restoreSize mstripe | |
| t2 <- getMonotonicTime | |
| pure (Resource a n (t2 - t1) (AcqusitionResult (Just (t2 - waitingOnResourceTime) Created) 0, localPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added separate measurement for waiting and resource creation.
Data/Pool/Introspection.hs
Outdated
| -- | A resource taken from the pool along with additional information. | ||
| data Resource a = Resource | ||
| { resource :: a | ||
| , stripeNumber :: !Int | ||
| , acquisitionTime :: !Double | ||
| , acquisitionMethod :: !AcquisitionMethod | ||
| , availableResources :: !Int | ||
| } deriving (Eq, Show, Generic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this type gives us the resource stats. Notably we get to know how much time was spent on acquiring this resource, whether or not we had to wait on a resource to become available, and how many more resources are currently available in the LocalPool stripe.
Comparing with #24, which introduced this type:
-- | Stats for a single 'LocalPool'.
data PoolStats = PoolStats {
highwaterUsage :: Int
-- ^ Highest usage since last reset.
, currentUsage :: Int
-- ^ Current number of items.
, takes :: Int
-- ^ Number of takes since last reset.
, creates :: Int
-- ^ Number of creates since last reset.
, createFailures :: Int
-- ^ Number of creation failures since last reset.
} deriving (Show)We don't get these stats, and that's what we're looking for (at least, in our use at Mercury).
We can get creates by modifying the function passed in newPool to increment a TVar on our side, but that won't be LocalPool specific - it'll be limited to the Pool as a whole. Probably same with createFailures. We cannot get currentUsage with the API in Pool already, nor can we get takes. We are not using highwaterUsage since we did not want to reset the pool metrics each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to make a set of stats to satisfy everyone and not blow it up 🙂
But since everything from the .Internal module is exposed, I think writing a version of takeResource that would work for you is not a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! THanks for making that easy for us 😄
* Add PoolConfig * Rename poolCapacity to poolMaxResources * Improve doc
* Adjust stats * Fix a typo * Fix a typo
I created this PR primarily so that I can easily compare the differences between this and
scrive's upstream fork. May also be useful for others.