Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
import io.flamingock.internal.common.core.context.ContextResolver;
import io.flamingock.internal.common.core.error.FlamingockException;
import io.flamingock.internal.common.couchbase.CouchbaseUtils;
import io.flamingock.internal.core.builder.FlamingockEdition;
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker;
import io.flamingock.internal.core.transaction.TransactionManager;
import io.flamingock.internal.core.transaction.TransactionWrapper;
import java.util.function.Supplier;

import java.util.Objects;
import java.util.Optional;

import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;

public class CouchbaseTargetSystem extends TransactionalTargetSystem<CouchbaseTargetSystem> implements CouchbaseExternalSystem {

Expand Down Expand Up @@ -79,11 +82,18 @@ public void initialize(ContextResolver baseContext) {
targetSystemContext.addDependency(bucket);


TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(null); //TODO: update as needed
Supplier<TransactionAttemptContext> couchbaseTxSupplier = () -> {
throw new FlamingockException(
"Couchbase TransactionAttemptContext can only be obtained inside cluster.transactions().run(); "
+ "the wrapper must register the session via TransactionManager.startSession(sessionId, ctx).");
};
TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(couchbaseTxSupplier);
txWrapper = new CouchbaseTxWrapper(cluster, txManager);

//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
auditMarker = edition == COMMUNITY
? new NoOpTargetSystemAuditMarker(this.getId())
: CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager).build();
}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void afterEach() throws Exception {
//tear down
mockRunnerServer.stop();
CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CLIENTS_COLLECTION);
CouchbaseCollectionHelper.dropCollectionIfExists(cluster, BUCKET_NAME, SCOPE_NAME, CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME);
}

@Test
Expand Down Expand Up @@ -182,14 +183,12 @@ void happyPath() {

// check clients changes
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 1);
//TODO add when cloud added
// check ongoing status
// couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
couchbaseTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
}
}

@Test
@Disabled("adapt when adding cloud support")
@DisplayName("Should rollback the ongoing deletion when a change fails")
void failedChanges() {
String executionId = "execution-1";
Expand Down Expand Up @@ -230,16 +229,15 @@ void failedChanges() {
.build();

//THEN
mockRunnerServer.verifyAllCalls();

OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);

mockRunnerServer.verifyAllCalls();

// check clients changes
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 0);

//TODO when cloud enabled
// check ongoing status
// couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;

public class DynamoDBTargetSystem extends TransactionalTargetSystem<DynamoDBTargetSystem> implements DynamoDBExternalSystem {

Expand Down Expand Up @@ -63,14 +64,13 @@ public void initialize(ContextResolver baseContext) {
this.validate();
targetSystemContext.addDependency(client);

FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class)
.orElse(FlamingockEdition.CLOUD);

TransactionManager<TransactWriteItemsEnhancedRequest.Builder> txManager = new TransactionManager<>(TransactWriteItemsEnhancedRequest::builder);
txWrapper = new DynamoDBTxWrapper(client, txManager);

//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
auditMarker = edition == COMMUNITY
? new NoOpTargetSystemAuditMarker(this.getId())
: DynamoDBTargetSystemAuditMarker.builder(client, txManager).build();
}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ void afterEach() throws Exception {
//tear down
logger.info("Stopping Mock Server...");
mockRunnerServer.stop();
dynamoDBTestHelper.dropTable(UserEntity.tableName);
dynamoDBTestHelper.dropTable(dynamoDBTestHelper.tableName);
}

@Test
Expand Down Expand Up @@ -168,14 +170,12 @@ void happyPath() {
.table(UserEntity.tableName, TableSchema.fromBean(UserEntity.class)),
1);

//TODO when cloud enabled
// check ongoing status
// dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
dynamoDBTestHelper.checkOngoingChange(ongoingCount -> ongoingCount == 0);
}
}

@Test
@Disabled("adapt when adding cloud support")
@DisplayName("Should rollback the ongoing deletion when a change fails")
void failedChanges() {
String executionId = "execution-1";
Expand Down Expand Up @@ -216,10 +216,10 @@ void failedChanges() {
.build();

//THEN
mockRunnerServer.verifyAllCalls();

OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);

mockRunnerServer.verifyAllCalls();

// check clients changes
dynamoDBTestHelper.checkCount(
DynamoDbEnhancedClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;

import java.util.function.Predicate;

Expand All @@ -42,6 +44,13 @@ public boolean tableExists(String tableName) {
return dynamoDBUtil.getDynamoDBClient().listTables().tableNames().contains(tableName);
}

public void dropTable(String tableName) {
try {
dynamoDBUtil.getDynamoDBClient().deleteTable(DeleteTableRequest.builder().tableName(tableName).build());
} catch (ResourceNotFoundException ignored) {
}
}

public DynamoDbClient getDynamoDBClient() {
return dynamoDBUtil.getDynamoDBClient();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2025 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.targetsystem.mongodb.springdata;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
import io.flamingock.internal.common.mongodb.CollectionInitializator;
import io.flamingock.internal.common.mongodb.MongoDBSyncCollectionHelper;
import io.flamingock.internal.common.mongodb.MongoDBSyncDocumentHelper;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMarker;
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
import org.bson.Document;
import org.springframework.data.mongodb.core.MongoTemplate;

import java.util.HashSet;
import java.util.Set;

/**
* Audit marker for the MongoDB Spring Data target system.
* <p>
* Writes participate in the active Spring-managed MongoDB transaction by going through
* {@link MongoTemplate#getDb()}, which returns a session-aware {@link MongoDatabase} when
* called inside an active {@code MongoTransactionManager} transaction. This avoids exposing
* the underlying {@code ClientSession} through the Flamingock {@code TransactionManager},
* keeping the Spring abstraction intact.
*/
public class MongoDBSpringDataAuditMarker implements TargetSystemAuditMarker {

public static final String OPERATION = "operation";
private static final String CHANGE_ID = "changeId";

private final MongoTemplate mongoTemplate;
private final String collectionName;

public MongoDBSpringDataAuditMarker(MongoTemplate mongoTemplate, String collectionName) {
this.mongoTemplate = mongoTemplate;
this.collectionName = collectionName;
}

public static Builder builder(MongoTemplate mongoTemplate) {
return new Builder(mongoTemplate);
}

public static TargetSystemAuditMark mapToOnGoingStatus(Document document) {
TargetSystemAuditMarkType operation = TargetSystemAuditMarkType.valueOf(document.getString(OPERATION));
return new TargetSystemAuditMark(document.getString(CHANGE_ID), operation);
}

@Override
public Set<TargetSystemAuditMark> listAll() {
return collection().find()
.map(MongoDBSpringDataAuditMarker::mapToOnGoingStatus)
.into(new HashSet<>());
}

@Override
public void clearMark(String changeId) {
collection().deleteMany(Filters.eq(CHANGE_ID, changeId));
}

@Override
public void mark(TargetSystemAuditMark auditMark) {
Document filter = new Document(CHANGE_ID, auditMark.getChangeId());
Document newDocument = new Document(CHANGE_ID, auditMark.getChangeId())
.append(OPERATION, auditMark.getOperation().name());

collection().updateOne(
filter,
new Document("$set", newDocument),
new UpdateOptions().upsert(true));
}

private MongoCollection<Document> collection() {
return mongoTemplate.getDb().getCollection(collectionName);
}

public static class Builder {
private final MongoTemplate mongoTemplate;
private boolean autoCreate = true;
private String collectionName = CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME;

public Builder(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}

public Builder setCollectionName(String collectionName) {
this.collectionName = collectionName;
return this;
}

public Builder withAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
return this;
}

public MongoDBSpringDataAuditMarker build() {
MongoDatabase db = mongoTemplate.getDb();
MongoCollection<Document> collection = db.getCollection(collectionName);
CollectionInitializator<MongoDBSyncDocumentHelper> initializer = new CollectionInitializator<>(
new MongoDBSyncCollectionHelper(collection),
() -> new MongoDBSyncDocumentHelper(new Document()),
new String[]{CHANGE_ID}
);
if (autoCreate) {
initializer.initialize();
} else {
initializer.justValidateCollection();
}
return new MongoDBSpringDataAuditMarker(mongoTemplate, collectionName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.flamingock.internal.common.core.audit.AuditReaderType;
import io.flamingock.internal.common.core.context.ContextResolver;
import io.flamingock.internal.common.core.error.FlamingockException;
import io.flamingock.internal.core.builder.FlamingockEdition;
import io.flamingock.internal.core.external.targets.mark.NoOpTargetSystemAuditMarker;
import io.flamingock.internal.core.external.targets.TransactionalTargetSystem;
import io.flamingock.internal.core.transaction.TransactionWrapper;
Expand All @@ -36,6 +37,7 @@
import static io.flamingock.internal.common.core.audit.AuditReaderType.MONGOCK;
import static io.flamingock.internal.common.core.metadata.Constants.DEFAULT_MONGOCK_ORIGIN;
import static io.flamingock.internal.common.core.metadata.Constants.MONGOCK_IMPORT_ORIGIN_PROPERTY_KEY;
import static io.flamingock.internal.core.builder.FlamingockEdition.COMMUNITY;


public class MongoDBSpringDataTargetSystem extends TransactionalTargetSystem<MongoDBSpringDataTargetSystem>
Expand Down Expand Up @@ -106,8 +108,10 @@ public void initialize(ContextResolver baseContext) {
.writeConcern(writeConcern)
.build();

//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
auditMarker = new NoOpTargetSystemAuditMarker(this.getId());
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class).orElse(COMMUNITY);
auditMarker = edition == COMMUNITY
? new NoOpTargetSystemAuditMarker(this.getId())
: MongoDBSpringDataAuditMarker.builder(mongoTemplate).build();
}

private void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.flamingock.common.test.cloud.prototype.PrototypeClientSubmission;
import io.flamingock.common.test.cloud.prototype.PrototypeStage;
import io.flamingock.internal.util.Trio;
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
import io.flamingock.internal.core.builder.FlamingockFactory;
import io.flamingock.internal.core.builder.CloudChangeRunnerBuilder;
Expand Down Expand Up @@ -127,6 +128,7 @@ void afterEach() throws Exception {
mockRunnerServer.stop();

testDatabase.getCollection(CLIENTS_COLLECTION).drop();
testDatabase.getCollection(CommunityPersistenceConstants.DEFAULT_MARKER_STORE_NAME).drop();
}

@Test
Expand Down Expand Up @@ -180,7 +182,6 @@ void happyPath() {
}

@Test
@Disabled("adapt when adding cloud support")
@DisplayName("Should rollback the ongoing deletion when a change fails")
void failedChanges() {
String executionId = "execution-1";
Expand Down Expand Up @@ -221,16 +222,15 @@ void failedChanges() {
.build();

//THEN
mockRunnerServer.verifyAllCalls();

OperationException ex = Assertions.assertThrows(OperationException.class, runner::run);

mockRunnerServer.verifyAllCalls();

// check clients changes
mongoDBTestHelper.checkCount(testDatabase.getCollection(CLIENTS_COLLECTION), 0);

//TODO when cloud enabled
// check ongoing status
// mongoDBTestHelper.checkEmptyTargetSystemAudiMarker();
mongoDBTestHelper.checkEmptyTargetSystemAudiMarker();
}
}

Expand Down
Loading
Loading