Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from 'zod'

import { captureApiChange, memberEditAffiliationsAction } from '@crowd/audit-logs'
import { NotFoundError } from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { signalMemberUpdate } from '@crowd/common_services'
import {
MemberField,
deleteAllMemberSegmentAffiliationsForProject,
Expand Down Expand Up @@ -75,6 +75,10 @@ export async function patchProjectAffiliation(req: Request, res: Response): Prom
memberEditAffiliationsAction(memberId, async (captureOldState, captureNewState) => {
captureOldState(existingAffiliations)

const oldOrgIds = existingAffiliations.map((a) => a.organizationId)
const newOrgIds = affiliations.map((a) => a.organizationId)
const orgIdsToRecalculate = [...new Set([...oldOrgIds, ...newOrgIds])]

await qx.tx(async (tx) => {
await deleteAllMemberSegmentAffiliationsForProject(tx, memberId, projectId)

Expand All @@ -91,13 +95,11 @@ export async function patchProjectAffiliation(req: Request, res: Response): Prom
})),
)
}
})

const oldOrgIds = existingAffiliations.map((a) => a.organizationId)
const newOrgIds = affiliations.map((a) => a.organizationId)
const orgIdsToRecalculate = [...new Set([...oldOrgIds, ...newOrgIds])]

const service = new CommonMemberService(tx, req.temporal, req.log)
await service.startAffiliationRecalculation(memberId, orgIdsToRecalculate)
// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(req.temporal, memberId, {
memberOrganizationIds: orgIdsToRecalculate,
})

updatedAffiliations = await fetchMemberSegmentAffiliationsForProject(qx, memberId, projectId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
NotFoundError,
sanitizeMemberOrganizationDateRange,
} from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { signalMemberUpdate } from '@crowd/common_services'
import {
MemberField,
changeMemberOrganizationAffiliationOverrides,
Expand Down Expand Up @@ -105,9 +105,11 @@ export async function createMemberWorkExperience(req: Request, res: Response): P
},
])
}
})

const service = new CommonMemberService(tx, req.temporal, req.log)
await service.startAffiliationRecalculation(memberId, [data.organizationId])
// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(req.temporal, memberId, {
memberOrganizationIds: [data.organizationId],
})

const orgsMap = await fetchManyMemberOrgsWithOrgData(qx, [memberId])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from 'zod'

import { captureApiChange, memberEditOrganizationsAction } from '@crowd/audit-logs'
import { NotFoundError } from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { signalMemberUpdate } from '@crowd/common_services'
import {
MemberField,
deleteMemberOrganizations,
Expand Down Expand Up @@ -46,10 +46,11 @@ export async function deleteMemberWorkExperience(req: Request, res: Response): P

await qx.tx(async (tx) => {
await deleteMemberOrganizations(tx, memberId, [workExperienceId])
const commonMemberService = new CommonMemberService(tx, req.temporal, req.log)
await commonMemberService.startAffiliationRecalculation(memberId, [
memberOrg.organizationId,
])
})

// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(req.temporal, memberId, {
memberOrganizationIds: [memberOrg.organizationId],
})

captureNewState(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from 'zod'

import { captureApiChange, memberEditOrganizationsAction } from '@crowd/audit-logs'
import { BadRequestError, NotFoundError, sanitizeMemberOrganizationDateRange } from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { signalMemberUpdate } from '@crowd/common_services'
import {
MemberField,
cleanSoftDeletedMemberOrganization,
Expand Down Expand Up @@ -80,9 +80,11 @@ export async function updateMemberWorkExperience(req: Request, res: Response): P
await qx.tx(async (tx) => {
await cleanSoftDeletedMemberOrganization(tx, memberId, data.organizationId, update)
await updateMemberOrganization(tx, memberId, workExperienceId, update)
})

const service = new CommonMemberService(tx, req.temporal, req.log)
await service.startAffiliationRecalculation(memberId, [data.organizationId])
// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(req.temporal, memberId, {
memberOrganizationIds: [data.organizationId],
})

const orgsMap = await fetchManyMemberOrgsWithOrgData(qx, [memberId])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from 'zod'

import { captureApiChange, memberVerifyWorkExperienceAction } from '@crowd/audit-logs'
import { NotFoundError } from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { signalMemberUpdate } from '@crowd/common_services'
import {
MemberField,
deleteMemberOrganizations,
Expand Down Expand Up @@ -62,12 +62,16 @@ export async function verifyMemberWorkExperience(req: Request, res: Response): P
})
} else {
await deleteMemberOrganizations(tx, memberId, [workExperienceId], true)

const service = new CommonMemberService(tx, req.temporal, req.log)
await service.startAffiliationRecalculation(memberId, [memberOrg.organizationId])
}
})

// Signal after commit so the workflow sees persisted changes
if (!verified) {
await signalMemberUpdate(req.temporal, memberId, {
memberOrganizationIds: [memberOrg.organizationId],
})
}

captureNewState(updatedMemberOrg ?? { ...memberOrg, verified, verifiedBy })
}),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import commandLineArgs from 'command-line-args'
import { randomUUID } from 'crypto'

import { signalMemberUpdate } from '@crowd/common_services'
import { getDbConnection } from '@crowd/data-access-layer/src/database'
import { getServiceLogger } from '@crowd/logging'
import { getTemporalClient } from '@crowd/temporal'
Expand Down Expand Up @@ -99,26 +99,8 @@ setImmediate(async () => {
try {
log.info(`Processing member: ${member.id}`)

const uuid = randomUUID()

await temporal.workflow.start('memberUpdate', {
taskQueue: 'profiles',
workflowId: `member-update-fix-unaffiliation/${organizationId}/${member.id}/${uuid}`,
retry: {
maximumAttempts: 10,
},
args: [
{
member: {
id: member.id,
},
memberOrganizationIds: [organizationId],
syncToOpensearch: false,
},
],
searchAttributes: {
TenantId: ['875c38bd-2b1b-4e91-ad07-0cfbabb4c49f'], // default tenantId
},
await signalMemberUpdate(temporal, member.id, {
memberOrganizationIds: [organizationId],
})

processedCount++
Expand Down
11 changes: 3 additions & 8 deletions backend/src/services/member/memberAffiliationsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import { uniq } from 'lodash'

import { Error400, dateIntersects, groupBy } from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { optionsQx } from '@crowd/data-access-layer'
import { signalMemberUpdate } from '@crowd/common_services'
import { findMaintainerRoles } from '@crowd/data-access-layer/src/maintainers'
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
import { LoggerBase } from '@crowd/logging'
Expand Down Expand Up @@ -108,12 +107,8 @@ export default class MemberAffiliationsService extends LoggerBase {
this.options,
)

const commonMemberService = new CommonMemberService(
optionsQx(this.options),
this.options.temporal,
this.options.log,
)
await commonMemberService.startAffiliationRecalculation(data.memberId, [])
await signalMemberUpdate(this.options.temporal, data.memberId)

return override
}
}
40 changes: 20 additions & 20 deletions backend/src/services/member/memberOrganizationsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import lodash from 'lodash'
import { Transaction } from 'sequelize'

import { Error404, sanitizeMemberOrganizationDateRange } from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import { signalMemberUpdate } from '@crowd/common_services'
import {
OrganizationField,
changeMemberOrganizationAffiliationOverrides,
Expand All @@ -14,7 +14,6 @@ import {
fetchMemberOrganizationById,
fetchMemberOrganizations,
findMemberAffiliationOverrides,
optionsQx,
queryOrgs,
updateMemberOrganization,
} from '@crowd/data-access-layer'
Expand All @@ -36,16 +35,9 @@ type IOrganizationSummary = Pick<IOrganization, 'id' | 'displayName' | 'logo' |
export default class MemberOrganizationsService extends LoggerBase {
options: IServiceOptions

private readonly commonMemberService: CommonMemberService

constructor(options: IServiceOptions) {
super(options.log)
this.options = options
this.commonMemberService = new CommonMemberService(
optionsQx(options),
options.temporal,
options.log,
)
}

// Member organization list
Expand Down Expand Up @@ -196,13 +188,16 @@ export default class MemberOrganizationsService extends LoggerBase {
])
}

// Start affiliation recalculation within the same transaction
await this.commonMemberService.startAffiliationRecalculation(memberId, [data.organizationId])

// Fetch updated list
const result = await this.list(memberId, transaction)

await SequelizeRepository.commitTransaction(transaction)

// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(this.options.temporal, memberId, {
memberOrganizationIds: [data.organizationId],
})

return result
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
Expand Down Expand Up @@ -259,11 +254,15 @@ export default class MemberOrganizationsService extends LoggerBase {
new Set([existing.organizationId, data.organizationId]),
).filter((orgId): orgId is string => Boolean(orgId))

await this.commonMemberService.startAffiliationRecalculation(memberId, orgsToRecalculate)

const result = await this.list(memberId, transaction)

await SequelizeRepository.commitTransaction(transaction)

// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(this.options.temporal, memberId, {
memberOrganizationIds: orgsToRecalculate,
})

return result
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
Expand All @@ -288,15 +287,16 @@ export default class MemberOrganizationsService extends LoggerBase {

await deleteMemberOrganizations(qx, memberId, [id], true)

await this.commonMemberService.startAffiliationRecalculation(
memberId,
[memberOrganizationToBeDeleted.organizationId],
true,
)

const result = await this.list(memberId, transaction)

await SequelizeRepository.commitTransaction(transaction)

// Signal after commit so the workflow sees persisted changes
await signalMemberUpdate(this.options.temporal, memberId, {
memberOrganizationIds: [memberOrganizationToBeDeleted.organizationId],
syncToOpensearch: true,
})

return result
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
Expand Down
13 changes: 4 additions & 9 deletions backend/src/services/memberService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getGithubInstallationToken,
invalidateMemberQueryCache,
prepareMemberUnmerge,
signalMemberUpdate,
startMemberUnmergeWorkflow,
unmergeMember,
} from '@crowd/common_services'
Expand Down Expand Up @@ -819,16 +820,10 @@ export default class MemberService extends LoggerBase {
// Pass invalidateCache from options to control whether to clear list caches
await invalidateMemberQueryCache(this.options.redis, [id], invalidateCache)

const commonMemberService = new CommonMemberService(
optionsQx(this.options),
this.options.temporal,
this.options.log,
)
await commonMemberService.startAffiliationRecalculation(
id,
(data.organizations || []).map((o) => o.id),
await signalMemberUpdate(this.options.temporal, id, {
memberOrganizationIds: (data.organizations || []).map((o) => o.id),
syncToOpensearch,
)
})

return record
} catch (error) {
Expand Down
Loading
Loading