Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
28 changes: 28 additions & 0 deletions docs/src/developer/reference/config-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,13 @@ time. For conversations, this is necessary for channel search and management of
channels from the team-management UI. It is highly recommended to take a backup
of the affected Cassandra data before triggering a migration.

When migrating conversations, `background-worker.config.migrateConversationsOptions.timeout`
should be configured as well. It sets a per-conversation upper bound for the
migration attempt, so a stuck conversation does not keep the migration run
blocked indefinitely. Start with a value that is comfortably above the normal
time for one conversation migration, then adjust it based on observed runtime
and the size of your dataset.

Migrations are independent and can be run separately, in batches, or all at
once. This is expected, because migrations will be released over time. The
pattern below applies per `postgresMigration` setting. A single setting may
Expand Down Expand Up @@ -2082,6 +2089,13 @@ migrateConversationCodes: false
migrateTeamFeatures: false
migrateDomainRegistration: false

# conversation migration settings
migrateConversationsOptions:
pageSize: 10000
parallelism: 2
# (optional) migration timeout in seconds, applies to a single conversation
timeout: 60

# Background jobs consumer
backgroundJobs:
concurrency: 8 # in-flight jobs per process
Expand All @@ -2092,6 +2106,20 @@ backgroundJobs:
federationDomain: example.org
```

The optional `migrateConversationsOptions.timeout` setting limits how long a single
conversation migration attempt may run after it has acquired the migration
lock. The value is a plain number of seconds, so `60` means 1 minute.
If the timeout is exceeded, that conversation migration is aborted and the
whole migration run is treated as failed.

Choose a value that is comfortably above the normal time for one conversation
migration, but still low enough to catch a genuinely stuck migration in a
reasonable time. A good starting point for most deployments is `60` seconds
(1 minute), then adjust based on observed migration durations.

If the setting is omitted, no timeout will be enforced. If a conversation
migration stalls, this can lead to leaked exclusive advisory locks.

Secrets

- Set `background-worker.secrets.pgPassword` to pass the PostgreSQL password. The chart mounts it to `/etc/wire/background-worker/secrets/pgPassword` and sets `postgresqlPassword` accordingly.
Expand Down
2 changes: 2 additions & 0 deletions libs/types-common/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
, optparse-applicative
, pem
, polysemy
, polysemy-time
, protobuf
, QuickCheck
, quickcheck-instances
Expand Down Expand Up @@ -95,6 +96,7 @@ mkDerivation {
optparse-applicative
pem
polysemy
polysemy-time
protobuf
QuickCheck
quickcheck-instances
Expand Down
3 changes: 2 additions & 1 deletion libs/types-common/src/Util/Timeout.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import Data.Aeson.Types
import Data.Scientific
import Data.Time.Clock
import Imports
import Polysemy.Time
import Test.QuickCheck (Arbitrary (arbitrary), choose)

newtype Timeout = Timeout
{ timeoutDiff :: NominalDiffTime
}
deriving newtype (Eq, Enum, Ord, Num, Real, Fractional, RealFrac, Show)
deriving newtype (Eq, Enum, Ord, Num, Real, Fractional, RealFrac, Show, TimeUnit)

instance Arbitrary Timeout where
arbitrary = Timeout . fromIntegral <$> choose (60 :: Int, 10 * 24 * 3600)
Expand Down
1 change: 1 addition & 0 deletions libs/types-common/types-common.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ library
, optparse-applicative >=0.10
, pem
, polysemy
, polysemy-time
, protobuf >=0.2
, QuickCheck >=2.9
, quickcheck-instances >=0.3.16
Expand Down
77 changes: 63 additions & 14 deletions libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import Control.Error (lastMay)
import Data.Conduit
import Data.Conduit.List qualified as C
import Data.Domain
import Data.IORef qualified as IORef
import Data.Id
import Data.IntMap qualified as IntMap
import Data.Map qualified as Map
Expand All @@ -46,12 +47,14 @@ import Polysemy.Async
import Polysemy.Conc
import Polysemy.Error
import Polysemy.Input
import Polysemy.Resource (Resource, resourceToIOFinal)
import Polysemy.Resource (Resource, bracket, resourceToIOFinal)
import Polysemy.State
import Polysemy.Time
import Polysemy.TinyLog
import Prometheus qualified
import System.Logger qualified as Log
import UnliftIO.Exception qualified as UnliftIO
import Util.Timeout
import Wire.API.Conversation hiding (Member)
import Wire.API.Conversation.CellsState
import Wire.API.Conversation.Protocol
Expand Down Expand Up @@ -101,15 +104,16 @@ migrateConvsLoop ::
Prometheus.Counter ->
Prometheus.Counter ->
Prometheus.Counter ->
Prometheus.Vector Text Prometheus.Histogram ->
IO ()
migrateConvsLoop migOpts cassClient pgPool logger migCounter migFinished migFailed =
migrateConvsLoop migOpts cassClient pgPool logger migCounter migFinished migFailed migDuration =
migrationLoop
logger
"conversations"
migFinished
migFailed
(interpreter cassClient pgPool logger "conversations")
(migrateAllConversations migOpts migCounter)
(migrateAllConversations migOpts migCounter migDuration)

migrateUsersLoop ::
MigrationOptions ->
Expand Down Expand Up @@ -157,12 +161,13 @@ migrateAllConversations ::
) =>
MigrationOptions ->
Prometheus.Counter ->
Prometheus.Vector Text Prometheus.Histogram ->
ConduitM () Void (Sem r) ()
migrateAllConversations migOpts migCounter = do
migrateAllConversations migOpts migCounter migDuration = do
lift $ info $ Log.msg (Log.val "migrateAllConversations")
withCount (paginateSem select (paramsP LocalQuorum () migOpts.pageSize) x5)
.| logRetrievedPage migOpts.pageSize runIdentity
.| C.mapM_ (unsafePooledMapConcurrentlyN_ migOpts.parallelism (handleErrors (migrateConversation migCounter) "conv"))
.| C.mapM_ (unsafePooledMapConcurrentlyN_ migOpts.parallelism (handleErrors (migrateConversationWithLock migOpts.timeout migCounter migDuration) "conv"))
where
select :: PrepQuery R () (Identity ConvId)
select = "select conv from conversation"
Expand Down Expand Up @@ -209,7 +214,7 @@ handleError action lockType id_ = do

-- * Conversations

migrateConversation ::
migrateConversationWithLock ::
( PGConstraints r,
Member (Input ClientState) r,
Member TinyLog r,
Expand All @@ -218,17 +223,61 @@ migrateConversation ::
Member Race r,
Member Resource r
) =>
Maybe Timeout ->
Prometheus.Counter ->
Prometheus.Vector Text Prometheus.Histogram ->
ConvId ->
Sem r ()
migrateConversation migCounter cid = do
void . withMigrationLocks LockExclusive (Seconds 10) [cid] $ do
mConvData <- withCassandra $ getAllConvData cid
for_ mConvData $ \convData -> do
saveConvToPostgres convData
withCassandra $ deleteConv convData
markDeletionComplete DeleteConv cid
liftIO $ Prometheus.incCounter migCounter
migrateConversationWithLock mTimeout migCounter migDuration cid = do
outcomeRef <- liftIO $ IORef.newIORef @Text "error"
bracket
(liftIO getCurrentTime)
(observeDuration migDuration outcomeRef)
( const do
result <-
runError $
withMigrationLocks LockExclusive (Seconds 10) [cid] $ do
case mTimeout of
Just to -> do
timeoutResult <- Polysemy.Conc.timeout (to <$ handleTimeout to) to $ migrateConversation
case timeoutResult of
Left timedOutAfter -> do
markOutcome outcomeRef "timeout"
-- this aborts the whole migration process
liftIO . UnliftIO.throwIO $ MigrationTimedOut (idToText cid) timedOutAfter
Right () -> do
markOutcome outcomeRef "success"
Nothing -> do
migrateConversation
markOutcome outcomeRef "success"

case result of
Left TimedOutAcquiringLock -> do
markOutcome outcomeRef "lock_timeout"
throw TimedOutAcquiringLock
Right () -> pure ()
)
where
migrateConversation = do
mConvData <- withCassandra $ getAllConvData cid
for_ mConvData $ \convData -> do
saveConvToPostgres convData
withCassandra $ deleteConv convData
markDeletionComplete DeleteConv cid
liftIO $ Prometheus.incCounter migCounter

handleTimeout to = do
err $
Log.msg (Log.val "conversation migrations timed out")
. Log.field "conv" (idToText cid)
. Log.field "timeout" (show to)

markOutcome ref outcome = liftIO $ IORef.writeIORef ref outcome

observeDuration metric outcomeRef start = do
outcome <- liftIO $ IORef.readIORef outcomeRef
end <- liftIO getCurrentTime
liftIO $ Prometheus.withLabel metric outcome (`Prometheus.observe` realToFrac (diffUTCTime end start))

deleteConvFromCassandra :: (Member (Input ClientState) r, Member TinyLog r, Member (Embed IO) r) => AllConvData -> Sem r ()
deleteConvFromCassandra allConvData = withCassandra $ do
Expand Down
12 changes: 11 additions & 1 deletion libs/wire-subsystems/src/Wire/Migration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,25 @@ import Polysemy.TinyLog
import Prometheus qualified
import System.Logger qualified as Log
import UnliftIO qualified
import Util.Timeout (Timeout)
import Wire.Util (embedClient)

data MigrationOptions = MigrationOptions
{ pageSize :: Int32,
parallelism :: Int
parallelism :: Int,
-- optional timeout that applies to a single conversation and
-- limits how long a single conversation migration attempt may run
-- after it has acquired the migration lock
timeout :: Maybe Timeout
}
deriving (Show, Eq, Generic)
deriving (FromJSON) via Generically MigrationOptions

data MigrationTimedOut = MigrationTimedOut Text Timeout
deriving stock (Show)

instance Exception MigrationTimedOut

migrationLoop ::
Log.Logger ->
ByteString ->
Expand Down
6 changes: 3 additions & 3 deletions services/background-worker/src/Wire/BackgroundWorker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,21 @@ run opts galleyOpts = do
then
runAppT env $
withNamedLogger "migrate-conversation-codes" $
Migrations.conversationCodes (MigrationOptions 1000 1)
Migrations.conversationCodes (MigrationOptions 1000 1 Nothing)
else pure $ pure ()
cleanupTeamFeaturesMigration <-
if opts.migrateTeamFeatures
then
runAppT env $
withNamedLogger "migrate-team-features" $
Migrations.teamFeatures (MigrationOptions 1000 1)
Migrations.teamFeatures (MigrationOptions 1000 1 Nothing)
else pure $ pure ()
cleanupDomainRegistrationMigration <-
if opts.migrateDomainRegistration
then
runAppT env $
withNamedLogger "migrate-domain-registration" $
Migrations.domainRegistration (MigrationOptions 1000 1)
Migrations.domainRegistration (MigrationOptions 1000 1 Nothing)
else pure $ pure ()
cleanupJobs <-
runAppT env $
Expand Down
3 changes: 2 additions & 1 deletion services/background-worker/src/Wire/PostgresMigrations.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ conversations migOpts = do
convMigCounter <- register $ counter $ Prometheus.Info "wire_local_convs_migrated_to_pg" "Number of local conversations migrated to Postgresql"
convMigFinished <- register $ counter $ Prometheus.Info "wire_local_convs_migration_finished" "Whether the conversation migration to Postgresql is finished successfully"
convMigFailed <- register $ counter $ Prometheus.Info "wire_local_convs_migration_failed" "Whether the conversation migration to Postgresql has failed"
convMigDuration <- register $ vector "outcome" $ histogram (Prometheus.Info "wire_local_convs_migration_duration_seconds" "Duration of local conversation migration attempts") defaultBuckets
userMigCounter <- register $ counter $ Prometheus.Info "wire_user_remote_convs_migrated_to_pg" "Number of users whose remote conversation membership data is migrated to Postgresql"
userMigFinished <- register $ counter $ Prometheus.Info "wire_user_remote_convs_migration_finished" "Whether the migration of remote conversation membership data to Postgresql is finished successfully"
userMigFailed <- register $ counter $ Prometheus.Info "wire_user_remote_convs_migration_failed" "Whether the migration of remote conversation membership data to Postgresql has failed"

convLoop <- async . lift $ migrateConvsLoop migOpts cassClient pgPool logger convMigCounter convMigFinished convMigFailed
convLoop <- async . lift $ migrateConvsLoop migOpts cassClient pgPool logger convMigCounter convMigFinished convMigFailed convMigDuration
userLoop <- async . lift $ migrateUsersLoop migOpts cassClient pgPool logger userMigCounter userMigFinished userMigFailed

Log.info logger $ Log.msg (Log.val "started conversation migration")
Expand Down
Loading