Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 26 additions & 64 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const errors = require('arsenal').errors;
const { replicationBackends, emptyFileMd5 } = require('arsenal').constants;
const MongoClient = require('arsenal').storage
.metadata.mongoclient.MongoClientInterface;
const { ObjectMD } = require('arsenal').models;
const { ObjectMD, ReplicationConfiguration } = require('arsenal').models;
const { VersionID } = require('arsenal').versioning;
const { extractVersionId } = require('../../lib/util/versioning');

Expand Down Expand Up @@ -234,24 +234,6 @@ class MongoQueueProcessor {
});
}

/**
* get dataStoreVersionId, if exists
* @param {ObjectMDData} objMd - object md fetched from mongo
* @param {String} site - storage location name
* @return {String} dataStoreVersionId
*/
_getDataStoreVersionId(objMd, site) {
let dataStoreVersionId = '';
if (objMd.replicationInfo && objMd.replicationInfo.backends) {
const backend = objMd.replicationInfo.backends
.find(l => l.site === site);
if (backend && backend.dataStoreVersionId) {
dataStoreVersionId = backend.dataStoreVersionId;
}
}
return dataStoreVersionId;
}

/**
* Update ingested entry metadata fields: owner-id, owner-display-name
* @param {ObjectQueueEntry} entry - object queue entry object
Expand Down Expand Up @@ -340,53 +322,33 @@ class MongoQueueProcessor {
const objectMDModel = new ObjectMD();
entry.setReplicationInfo(objectMDModel.getReplicationInfo());

// TODO: refactor based off cloudserver getReplicationInfo
if (bucketRepInfo) {
const { role, destination, rules } = bucketRepInfo;
const rule = rules.find(r =>
(entry.getObjectKey().startsWith(r.prefix) && r.enabled));

if (rule) {
const replicationInfo = {};
const storageTypes = [];
const backends = [];
const storageClasses = rule.storageClass.split(',');

storageClasses.forEach(storageClass => {
const storageClassName =
storageClass.endsWith(':preferred_read') ?
storageClass.split(':')[0] : storageClass;
const location = this._bootstrapList.find(l =>
(l.site === storageClassName));
if (location && replicationBackends[location.type]) {
storageTypes.push(location.type);
}
let dataStoreVersionId = '';
if (zenkoObjMd) {
dataStoreVersionId = this._getDataStoreVersionId(
zenkoObjMd, storageClassName);
}
backends.push({
site: storageClassName,
status: 'PENDING',
dataStoreVersionId,
});
});
if (!bucketRepInfo) {
return;
}

// save updated replication info
replicationInfo.status = 'PENDING';
replicationInfo.backends = backends;
replicationInfo.content = content;
replicationInfo.destination = destination;
replicationInfo.storageClass = storageClasses.join(',');
replicationInfo.role = role;
replicationInfo.storageType = storageTypes.join(',');
replicationInfo.isNFS = bucketInfo.isNFS();

// apply changes
entry.setReplicationInfo(replicationInfo);
}
const isCloud = site => {
const location = this._bootstrapList.find(l => l.site === site);
return !!(location && replicationBackends[location.type]);
};

const backends = ReplicationConfiguration.resolveBackends(
bucketRepInfo,
entry.getObjectKey(),
isCloud,
zenkoObjMd?.replicationInfo?.backends,
);

if (backends.length === 0) {
return;
}

entry.setReplicationInfo({
status: 'PENDING',
backends,
content,
role: ReplicationConfiguration.resolveSourceRole(bucketRepInfo.role),
isNFS: bucketInfo.isNFS(),
});
}

/**
Expand Down
8 changes: 3 additions & 5 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry');
const TaskScheduler = require('../../../lib/tasks/TaskScheduler');
const { getTaskSchedulerQueueKey,
getTaskSchedulerDedupeKey } = require('./taskSchedulerHelpers');
const getLocationsFromStorageClass =
require('../utils/getLocationsFromStorageClass');
const ReplicateObject = require('../tasks/ReplicateObject');
const MultipleBackendTask = require('../tasks/MultipleBackendTask');
const CopyLocationTask = require('../tasks/CopyLocationTask');
Expand Down Expand Up @@ -881,9 +879,9 @@ class QueueProcessor extends EventEmitter {
}
// ignore bucket entry if echo mode disabled
} else if (sourceEntry instanceof ObjectQueueEntry) {
const replicationStorageClass =
sourceEntry.getReplicationStorageClass();
const sites = getLocationsFromStorageClass(replicationStorageClass);
const sites = sourceEntry.getReplicationBackends()
.filter(b => b.status === 'PENDING')
.map(b => b.site);
if (sites.includes(this.site)) {
if (this.destConfig.replicationEndpoint &&
replicationBackends.includes(this.destConfig.replicationEndpoint.type)) {
Expand Down
16 changes: 8 additions & 8 deletions extensions/replication/tasks/MultipleBackendTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendAbortMPUCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
UploadId: uploadId,
RequestUids: log.getSerializedUids(),
Expand Down Expand Up @@ -353,7 +353,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendCompleteMPUCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
VersionId: sourceEntry.getEncodedVersionId() || 'null',
UserMetaData: sourceEntry.getUserMetadata(),
Expand Down Expand Up @@ -445,7 +445,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendPutMPUPartCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
PartNumber: partNumber,
UploadId: uploadId,
Expand Down Expand Up @@ -506,7 +506,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendInitiateMPUCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
VersionId: sourceEntry.getEncodedVersionId() || 'null',
UserMetaData: sourceEntry.getUserMetadata(),
Expand Down Expand Up @@ -951,7 +951,7 @@ class MultipleBackendTask extends ReplicateObject {
Key: sourceEntry.getObjectKey(),
CanonicalID: sourceEntry.getOwnerId(),
ContentMD5: sourceEntry.getContentMd5(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
VersionId: sourceEntry.getEncodedVersionId() || 'null',
UserMetaData: sourceEntry.getUserMetadata(),
Expand Down Expand Up @@ -1008,7 +1008,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendPutObjectTaggingCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
DataStoreVersionId:
sourceEntry.getReplicationSiteDataStoreVersionId(this.site),
Expand Down Expand Up @@ -1065,7 +1065,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendDeleteObjectTaggingCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
DataStoreVersionId:
sourceEntry.getReplicationSiteDataStoreVersionId(this.site),
Expand Down Expand Up @@ -1146,7 +1146,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendDeleteObjectCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
RequestUids: log.getSerializedUids(),
});
Expand Down
33 changes: 26 additions & 7 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { S3Client, GetBucketReplicationCommand, GetObjectCommand } = require('@aw
const errors = require('arsenal').errors;
const jsutil = require('arsenal').jsutil;
const ObjectMDLocation = require('arsenal').models.ObjectMDLocation;
const ReplicationConfiguration = require('arsenal').models.ReplicationConfiguration;

const ClientManager = require('../../../lib/clients/ClientManager');
const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');
Expand Down Expand Up @@ -227,7 +228,7 @@ class ReplicateObject extends BackbeatTask {
_setupRolesOnce(entry, log, cb) {
log.debug('getting bucket replication',
{ entry: entry.getLogInfo() });
const entryRolesString = entry.getReplicationRoles();
const entryRolesString = entry.getReplicationRoles(this.site);
let entryRoles;
if (entryRolesString !== undefined) {
entryRoles = entryRolesString.split(',');
Expand Down Expand Up @@ -266,9 +267,9 @@ class ReplicateObject extends BackbeatTask {
'replication disabled for object'));
}
const roles = data.ReplicationConfiguration.Role.split(',');
if (roles.length !== 2) {
log.error('expecting two roles separated by a ' +
'comma in bucket replication configuration',
if (roles.length < 1 || roles.length > 2) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: roles.length < 1 is unreachable — String.split(',') always returns at least one element (''.split(',')['']). Could simplify to roles.length > 2.

Suggested change
if (roles.length < 1 || roles.length > 2) {
if (roles.length > 2) {

— Claude Code

log.error('expecting one or two roles in bucket ' +
'replication configuration',
{
method: 'ReplicateObject._setupRolesOnce',
entry: entry.getLogInfo(),
Expand All @@ -287,18 +288,36 @@ class ReplicateObject extends BackbeatTask {
});
return cb(errors.BadRole);
}
if (roles[1] !== entryRoles[1]) {
// Multi-destination CRR: derive the expected destination
// role for this site from the matching rule's Account
// override; legacy configs without Account fall back to
// the literal two-comma role equality check.
const matchingRule = data.ReplicationConfiguration.Rules.find(
rule => rule.Status === 'Enabled' &&
entry.getObjectKey().startsWith(rule.Prefix) &&
rule.Destination &&
rule.Destination.StorageClass === this.site);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rules.find() picks the first matching rule in the API response, but MongoQueueProcessor._updateReplicationInfo selects backends from the highest-priority rule (via .sort((a, b) => (b.priority || 0) - (a.priority || 0))). If multiple enabled rules match the same object prefix and target the same StorageClass with different Destination.Account values, and the API returns them in non-priority order, the expectedDestRole derived here may differ from the role stored in the entry, causing a spurious BadRole rejection.

— Claude Code

let expectedDestRole;
if (matchingRule && matchingRule.Destination.Account) {
expectedDestRole = ReplicationConfiguration
.resolveDestinationRole(
data.ReplicationConfiguration.Role,
matchingRule.Destination.Account);
} else {
expectedDestRole = roles[1];
}
if (expectedDestRole !== entryRoles[1]) {
log.error('role in replication entry for target does ' +
'not match role in bucket replication configuration ',
{
method: 'ReplicateObject._setupRolesOnce',
entry: entry.getLogInfo(),
entryRole: entryRoles[1],
bucketRole: roles[1],
bucketRole: expectedDestRole,
});
return cb(errors.BadRole);
}
return cb(null, roles[0], roles[1]);
return cb(null, entryRoles[0], entryRoles[1]);
})
.catch(err => {
// eslint-disable-next-line no-param-reassign
Expand Down
10 changes: 0 additions & 10 deletions extensions/replication/utils/getLocationsFromStorageClass.js

This file was deleted.

3 changes: 1 addition & 2 deletions lib/models/ObjectQueueEntry.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class ObjectQueueEntry extends ObjectMD {
const newEntry = this.clone();
newEntry
.setAccountId(this.getAccountId())
.setBucket(this.getReplicationTargetBucket())
.setBucket(this.getReplicationTargetBucket(site))
.setReplicationSiteStatus(site, 'REPLICA')
.setReplicationStatus('REPLICA');
return newEntry;
Expand Down Expand Up @@ -218,7 +218,6 @@ class ObjectQueueEntry extends ObjectMD {
.setAccountId(this.getAccountId())
.setReplicationBackends(this.getReplicationBackends().filter(o => o.site === site))
.setReplicationSiteStatus(site, 'PENDING')
.setReplicationStorageClass(site)
.setReplicationStatus('PENDING');
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"@scality/cloudserverclient": "^1.0.8",
"@smithy/node-http-handler": "^3.3.3",
"JSONStream": "^1.3.5",
"arsenal": "git+https://github.com/scality/arsenal#8.3.9",
"arsenal": "git+https://github.com/scality/arsenal#21b9bb33ad77d21609a690a5709a645eab1a95d7",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arsenal is pinned to a raw commit SHA instead of a tag. The yarn.lock resolves this to version 8.4.2 — if the tag exists, pin to 8.4.2 for readability and consistency with other git-based deps in this file (e.g. breakbeat#v1.0.3, werelogs#8.2.2).

— Claude Code

"async": "^2.3.0",
"backo": "^1.1.0",
"breakbeat": "scality/breakbeat#v1.0.3",
Expand Down
Binary file modified tests/unit/lib/models/ObjectQueueEntry.spec.js
Binary file not shown.
Loading
Loading