Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions jooq-dialect/src/main/java/org/jooq/impl/BatchReplace.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.jooq.impl;

import org.jooq.BatchBindStep;
import org.jooq.Field;
import org.jooq.TableRecord;
import org.jooq.exception.DataAccessException;
import org.reactivestreams.Subscriber;
import tech.ydb.jooq.ReplaceQuery;
import tech.ydb.jooq.YdbDSLContext;

public class BatchReplace extends AbstractBatch {
private final YdbDSLContext ydbDSLContext;
private final TableRecord<?>[] records;

public BatchReplace(YdbDSLContext ydbDSLContext, TableRecord<?>[] records) {
super(ydbDSLContext.configuration());
this.ydbDSLContext = ydbDSLContext;
this.records = records;
}

@Override
public int[] execute() throws DataAccessException {
BatchBindStep batchBindStep = prepareBatch();
if (batchBindStep == null) {
return new int[0];
}
int[] result = batchBindStep.execute();
updateChangedFlag();
return result;
}

@Override
public int size() {
return records.length;
}

@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
throw new UnsupportedOperationException("BatchUpsert operation is not supported in a reactive way");
}

private BatchBindStep prepareBatch() {
BatchBindStep batch = null;

for (TableRecord<?> record : records) {
ReplaceQuery<?> upsertQuery = ydbDSLContext.replaceQuery(record.getTable());

for (Field<?> field : record.fields()) {
upsertQuery.addValue((Field<Object>) field, record.get(field));
}

if (batch == null) {
batch = ydbDSLContext.batch(upsertQuery);
}

batch.bind(upsertQuery.getBindValues().toArray());
}

return batch;
}

private void updateChangedFlag() {
for (TableRecord<?> record : records) {
if (record instanceof AbstractRecord r) {
r.fetched = true;
}
}
}
}
7 changes: 7 additions & 0 deletions jooq-dialect/src/main/java/tech/ydb/jooq/YdbDSLContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,11 @@ public interface YdbDSLContext extends DSLContext {

@CheckReturnValue
<R extends Record> ReplaceValuesStepN<R> replaceInto(Table<R> into, Collection<? extends Field<?>> fields);


@CheckReturnValue
Batch batchReplace(TableRecord<?>... records);

@CheckReturnValue
Batch batchReplace(Collection<? extends TableRecord<?>> records);
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,14 @@ public <R extends Record> ReplaceValuesStepN<R> replaceInto(Table<R> into, Field
public <R extends Record> ReplaceValuesStepN<R> replaceInto(Table<R> into, Collection<? extends Field<?>> fields) {
return new ReplaceImpl<>(configuration(), into, fields);
}

@Override
public Batch batchReplace(TableRecord<?>... records) {
return new BatchReplace(this, records);
}

@Override
public Batch batchReplace(Collection<? extends TableRecord<?>> records) {
return batchReplace(records.toArray(TableRecord[]::new));
}
}
66 changes: 66 additions & 0 deletions jooq-dialect/src/test/java/tech/ydb/jooq/BatchReplaceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package tech.ydb.jooq;

import java.util.List;

import jooq.generated.ydb.default_schema.tables.records.SeriesRecord;
import org.jooq.Result;
import org.jooq.types.ULong;
import org.junit.jupiter.api.Test;

import static jooq.generated.ydb.default_schema.Tables.SERIES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class BatchReplaceTest extends BaseTest {

@Test
public void testSimpleInsertViaBatchReplace() {
SeriesRecord newRecord = new SeriesRecord();
newRecord.setSeriesId(ULong.valueOf(1));
newRecord.setTitle("New Series");
newRecord.setSeriesInfo("Info about the new series");
newRecord.setReleaseDate(ULong.valueOf(20220101));

dsl.batchReplace(newRecord)
.execute();

Result<SeriesRecord> upsertedRecord = dsl.selectFrom(SERIES)
.where(SERIES.SERIES_ID.eq(ULong.valueOf(1)))
.fetch();

assertEquals(List.of(newRecord), upsertedRecord);
}

@Test
public void testSimpleUpdateViaBatchReplace() {
dsl.batchInsert(getExampleRecords()).execute();

SeriesRecord replaceRecord = new SeriesRecord(ULong.valueOf(1), "Updated Series One", "Updated Info One",
ULong.valueOf(20220201));

dsl.batchReplace(replaceRecord)
.execute();

SeriesRecord updatedRecord = dsl.selectFrom(SERIES)
.where(SERIES.SERIES_ID.eq(ULong.valueOf(1)))
.fetchOne();

assertNotNull(updatedRecord);
assertEquals("Updated Series One", updatedRecord.getTitle());
assertEquals("Updated Info One", updatedRecord.getSeriesInfo());
assertEquals(ULong.valueOf(20220201), updatedRecord.getReleaseDate());
}

@Test
public void testMultipleInsertViaBatchReplace() {
List<SeriesRecord> records = getExampleRecords();

dsl.batchReplace(records)
.execute();

Result<SeriesRecord> upsertedRecords = dsl.selectFrom(SERIES).fetch();

assertEquals(records, upsertedRecords);
}

}